This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch 2.1-tmp in repository https://gitbox.apache.org/repos/asf/doris.git
commit 8a291efb0c50302d582844a948f489698f76ccbb Author: TengJianPing <18241664+jackte...@users.noreply.github.com> AuthorDate: Mon Apr 1 16:14:10 2024 +0800 [improvement](spill) avoid spill if memory is enough (#33075) --- .../exec/partitioned_aggregation_sink_operator.cpp | 30 +++++++++------ .../exec/partitioned_aggregation_sink_operator.h | 3 -- .../partitioned_aggregation_source_operator.cpp | 29 ++++++--------- .../exec/partitioned_aggregation_source_operator.h | 2 - be/src/pipeline/exec/spill_sort_sink_operator.cpp | 43 +++++++++++----------- be/src/pipeline/exec/spill_sort_sink_operator.h | 3 -- .../pipeline/exec/spill_sort_source_operator.cpp | 29 ++++----------- be/src/pipeline/exec/spill_sort_source_operator.h | 3 -- be/src/pipeline/pipeline_x/dependency.h | 2 + be/src/vec/common/sort/sorter.cpp | 3 +- be/src/vec/spill/spill_stream.cpp | 17 +++++++-- 11 files changed, 76 insertions(+), 88 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 3207c109589..d44c35a76a9 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -70,12 +70,16 @@ Status PartitionedAggSinkLocalState::close(RuntimeState* state, Status exec_stat if (Base::_closed) { return Status::OK(); } +<<<<<<< HEAD { std::unique_lock<std::mutex> lk(_spill_lock); if (_is_spilling) { _spill_cv.wait(lk); } } +======= + dec_running_big_mem_op_num(state); +>>>>>>> bb11955709 ([improvement](spill) avoid spill if memory is enough (#33075)) return Base::close(state, exec_status); } @@ -166,13 +170,17 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized: auto* runtime_state = local_state._runtime_state.get(); RETURN_IF_ERROR(_agg_sink_operator->sink(runtime_state, in_block, false)); if (eos) { - LOG(INFO) << "agg node " << id() << " sink eos"; - if (revocable_mem_size(state) > 0) { - RETURN_IF_ERROR(revoke_memory(state)); - } else { - for (auto& partition : local_state._shared_state->spill_partitions) { - RETURN_IF_ERROR(partition->finish_current_spilling(eos)); + if (local_state._shared_state->is_spilled) { + if (revocable_mem_size(state) > 0) { + RETURN_IF_ERROR(revoke_memory(state)); + } else { + for (auto& partition : local_state._shared_state->spill_partitions) { + RETURN_IF_ERROR(partition->finish_current_spilling(eos)); + } + local_state._dependency->set_ready_to_read(); + local_state._finish_dependency->set_ready(); } + } else { local_state._dependency->set_ready_to_read(); } } @@ -229,8 +237,10 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { LOG(INFO) << "agg node " << Base::_parent->id() << " revoke_memory" << ", eos: " << _eos; RETURN_IF_ERROR(Base::_shared_state->sink_status); - DCHECK(!_is_spilling); - _is_spilling = true; + if (!_shared_state->is_spilled) { + _shared_state->is_spilled = true; + profile()->add_info_string("Spilled", "true"); + } // TODO: spill thread may set_ready before the task::execute thread put the task to blocked state if (!_eos) { @@ -240,7 +250,6 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { Status status; Defer defer {[&]() { if (!status.ok()) { - _is_spilling = false; if (!_eos) { Base::_dependency->Dependency::set_ready(); } @@ -269,15 +278,12 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { << ", eos: " << _eos; } { - std::unique_lock<std::mutex> lk(_spill_lock); - _is_spilling = false; if (_eos) { Base::_dependency->set_ready_to_read(); _finish_dependency->set_ready(); } else { Base::_dependency->Dependency::set_ready(); } - _spill_cv.notify_one(); } }}; auto* runtime_state = _runtime_state.get(); diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 5e617386812..542046556ec 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -272,9 +272,6 @@ public: bool _eos = false; std::shared_ptr<Dependency> _finish_dependency; - bool _is_spilling = false; - std::mutex _spill_lock; - std::condition_variable _spill_cv; /// Resources in shared state will be released when the operator is closed, /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 960decdb951..f5eceac338c 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -88,12 +88,7 @@ Status PartitionedAggLocalState::close(RuntimeState* state) { if (_closed) { return Status::OK(); } - { - std::unique_lock<std::mutex> lk(_merge_spill_lock); - if (_is_merging) { - _merge_spill_cv.wait(lk); - } - } + dec_running_big_mem_op_num(state); return Base::close(state); } PartitionedAggSourceOperatorX::PartitionedAggSourceOperatorX(ObjectPool* pool, @@ -131,13 +126,16 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized: SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_ERROR(local_state._status); - RETURN_IF_ERROR(local_state.initiate_merge_spill_partition_agg_data(state)); + if (local_state._shared_state->is_spilled) { + RETURN_IF_ERROR(local_state.initiate_merge_spill_partition_agg_data(state)); - /// When `_is_merging` is true means we are reading spilled data and merging the data into hash table. - if (local_state._is_merging) { - return Status::OK(); + /// When `_is_merging` is true means we are reading spilled data and merging the data into hash table. + if (local_state._is_merging) { + return Status::OK(); + } } + // not spilled in sink or current partition still has data auto* runtime_state = local_state._runtime_state.get(); RETURN_IF_ERROR(_agg_source_operator->get_block(runtime_state, block, eos)); if (local_state._runtime_state) { @@ -146,7 +144,8 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized: local_state.update_profile(source_local_state->profile()); } if (*eos) { - if (!local_state._shared_state->spill_partitions.empty()) { + if (local_state._shared_state->is_spilled && + !local_state._shared_state->spill_partitions.empty()) { *eos = false; } } @@ -218,12 +217,8 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime } Base::_shared_state->in_mem_shared_state->aggregate_data_container ->init_once(); - { - std::unique_lock<std::mutex> lk(_merge_spill_lock); - _is_merging = false; - _dependency->Dependency::set_ready(); - _merge_spill_cv.notify_one(); - } + _is_merging = false; + _dependency->Dependency::set_ready(); }}; bool has_agg_data = false; auto& parent = Base::_parent->template cast<Parent>(); diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h index ac63402f227..eff1e7179c8 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h @@ -60,8 +60,6 @@ protected: std::future<Status> _spill_merge_future; bool _current_partition_eos = true; bool _is_merging = false; - std::mutex _merge_spill_lock; - std::condition_variable _merge_spill_cv; /// Resources in shared state will be released when the operator is closed, /// but there may be asynchronous spilling tasks at this time, which can lead to conflicts. diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index c586a8e5e56..7764aa948b9 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -74,11 +74,9 @@ Status SpillSortSinkLocalState::open(RuntimeState* state) { return Status::OK(); } Status SpillSortSinkLocalState::close(RuntimeState* state, Status execsink_status) { - { - std::unique_lock<std::mutex> lk(_spill_lock); - if (_is_spilling) { - _spill_cv.wait(lk); - } + auto& parent = Base::_parent->template cast<Parent>(); + if (parent._enable_spill) { + dec_running_big_mem_op_num(state); } return Status::OK(); } @@ -172,9 +170,16 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc local_state._shared_state->in_mem_shared_state->sorter->data_size()); if (eos) { if (_enable_spill) { - if (revocable_mem_size(state) > 0) { - RETURN_IF_ERROR(revoke_memory(state)); + if (local_state._shared_state->is_spilled) { + if (revocable_mem_size(state) > 0) { + RETURN_IF_ERROR(revoke_memory(state)); + } else { + local_state._dependency->set_ready_to_read(); + local_state._finish_dependency->set_ready(); + } } else { + RETURN_IF_ERROR( + local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read()); local_state._dependency->set_ready_to_read(); } } else { @@ -186,8 +191,10 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc return Status::OK(); } Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { - DCHECK(!_is_spilling); - _is_spilling = true; + if (!_shared_state->is_spilled) { + _shared_state->is_spilled = true; + profile()->add_info_string("Spilled", "true"); + } LOG(INFO) << "sort node " << Base::_parent->id() << " revoke_memory" << ", eos: " << _eos; @@ -243,17 +250,12 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { _shared_state->clear(); } - { - std::unique_lock<std::mutex> lk(_spill_lock); - _spilling_stream.reset(); - _is_spilling = false; - if (_eos) { - _dependency->set_ready_to_read(); - _finish_dependency->set_ready(); - } else { - _dependency->Dependency::set_ready(); - } - _spill_cv.notify_one(); + _spilling_stream.reset(); + if (_eos) { + _dependency->set_ready_to_read(); + _finish_dependency->set_ready(); + } else { + _dependency->Dependency::set_ready(); } }}; @@ -288,7 +290,6 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { return Status::OK(); }); if (!status.ok()) { - _is_spilling = false; _spilling_stream->end_spill(status); if (!_eos) { diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index ae5a3bcb8c7..d66215411aa 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -62,11 +62,8 @@ private: RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr; bool _eos = false; - bool _is_spilling = false; vectorized::SpillStreamSPtr _spilling_stream; std::shared_ptr<Dependency> _finish_dependency; - std::mutex _spill_lock; - std::condition_variable _spill_cv; }; class SpillSortSinkOperatorX final : public DataSinkOperatorX<SpillSortSinkLocalState> { diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index d249b3be56e..417ff704bc6 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -58,17 +58,10 @@ Status SpillSortLocalState::close(RuntimeState* state) { if (_closed) { return Status::OK(); } - { - std::unique_lock<std::mutex> lk(_merge_spill_lock); - if (_is_merging) { - _merge_spill_cv.wait(lk); - } + if (Base::_shared_state->enable_spill) { + dec_running_big_mem_op_num(state); } RETURN_IF_ERROR(Base::close(state)); - for (auto& stream : _current_merging_streams) { - (void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream); - } - _current_merging_streams.clear(); return Status::OK(); } int SpillSortLocalState::_calc_spill_blocks_to_merge() const { @@ -78,14 +71,11 @@ int SpillSortLocalState::_calc_spill_blocks_to_merge() const { Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* state) { auto& parent = Base::_parent->template cast<Parent>(); LOG(INFO) << "sort node " << _parent->node_id() << " merge spill data"; - DCHECK(!_is_merging); - _is_merging = true; _dependency->Dependency::block(); Status status; Defer defer {[&]() { if (!status.ok()) { - _is_merging = false; _dependency->Dependency::set_ready(); } }}; @@ -108,12 +98,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat } else { LOG(INFO) << "sort node " << _parent->node_id() << " merge spill data finish"; } - { - std::unique_lock<std::mutex> lk(_merge_spill_lock); - _is_merging = false; - _dependency->Dependency::set_ready(); - _merge_spill_cv.notify_one(); - } + _dependency->Dependency::set_ready(); }}; vectorized::Block merge_sorted_block; vectorized::SpillStreamSPtr tmp_stream; @@ -252,15 +237,15 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Bloc SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_ERROR(local_state._status); - if (!local_state.Base::_shared_state->enable_spill) { - RETURN_IF_ERROR( - _sort_source_operator->get_block(local_state._runtime_state.get(), block, eos)); - } else { + if (local_state.Base::_shared_state->enable_spill && local_state._shared_state->is_spilled) { if (!local_state._merger) { return local_state.initiate_merge_sort_spill_streams(state); } else { RETURN_IF_ERROR(local_state._merger->get_next(block, eos)); } + } else { + RETURN_IF_ERROR( + _sort_source_operator->get_block(local_state._runtime_state.get(), block, eos)); } local_state.reached_limit(block, eos); return Status::OK(); diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h b/be/src/pipeline/exec/spill_sort_source_operator.h index 8132dd5a56c..a20eb57889b 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.h +++ b/be/src/pipeline/exec/spill_sort_source_operator.h @@ -65,9 +65,6 @@ protected: int64_t _external_sort_bytes_threshold = 134217728; // 128M std::vector<vectorized::SpillStreamSPtr> _current_merging_streams; std::unique_ptr<vectorized::VSortedRunMerger> _merger; - bool _is_merging = false; - std::mutex _merge_spill_lock; - std::condition_variable _merge_spill_cv; std::unique_ptr<RuntimeProfile> _internal_runtime_profile; // counters for spill merge sort diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 77cd121be5e..1af48748d6c 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -397,6 +397,7 @@ struct PartitionedAggSharedState : public BasicSharedState, size_t partition_count; size_t max_partition_index; Status sink_status; + bool is_spilled = false; std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions; size_t get_partition_index(size_t hash_value) const { @@ -470,6 +471,7 @@ struct SpillSortSharedState : public BasicSharedState, SortSharedState* in_mem_shared_state = nullptr; bool enable_spill = false; + bool is_spilled = false; Status sink_status; std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr; diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 53fc2011232..db3cca8bf09 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -63,6 +63,7 @@ void MergeSorterState::reset() { cursors_.swap(empty_cursors); std::vector<Block> empty_blocks(0); sorted_blocks_.swap(empty_blocks); + unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty()); in_mem_sorted_bocks_size_ = 0; } Status MergeSorterState::add_sorted_block(Block& block) { @@ -70,8 +71,8 @@ Status MergeSorterState::add_sorted_block(Block& block) { if (0 == rows) { return Status::OK(); } - sorted_blocks_.emplace_back(std::move(block)); in_mem_sorted_bocks_size_ += block.bytes(); + sorted_blocks_.emplace_back(std::move(block)); num_rows_ += rows; return Status::OK(); } diff --git a/be/src/vec/spill/spill_stream.cpp b/be/src/vec/spill/spill_stream.cpp index d08b63df40b..f245f8fa309 100644 --- a/be/src/vec/spill/spill_stream.cpp +++ b/be/src/vec/spill/spill_stream.cpp @@ -68,8 +68,14 @@ void SpillStream::close() { read_promise_.reset(); } - (void)writer_->close(); - (void)reader_->close(); + if (writer_) { + (void)writer_->close(); + writer_.reset(); + } + if (reader_) { + (void)reader_->close(); + reader_.reset(); + } } const std::string& SpillStream::get_spill_root_dir() const { @@ -100,13 +106,16 @@ Status SpillStream::spill_block(const Block& block, bool eof) { size_t written_bytes = 0; RETURN_IF_ERROR(writer_->write(block, written_bytes)); if (eof) { - return writer_->close(); + RETURN_IF_ERROR(writer_->close()); + writer_.reset(); } return Status::OK(); } Status SpillStream::spill_eof() { - return writer_->close(); + RETURN_IF_ERROR(writer_->close()); + writer_.reset(); + return Status::OK(); } Status SpillStream::read_next_block_sync(Block* block, bool* eos) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org