This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch branch-4.0-preview in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2c6809df05c034a1ff5cc0d8c568968e15cdd594 Author: TengJianPing <18241664+jackte...@users.noreply.github.com> AuthorDate: Wed Apr 24 17:54:11 2024 +0800 [improvement](spill) improve config of spill thread pool (#33992) --- be/src/common/config.cpp | 13 +- be/src/common/config.h | 4 +- .../exec/partitioned_aggregation_sink_operator.cpp | 2 +- .../exec/partitioned_aggregation_sink_operator.h | 46 +++---- .../partitioned_aggregation_source_operator.cpp | 2 +- .../exec/partitioned_hash_join_probe_operator.cpp | 12 +- .../exec/partitioned_hash_join_sink_operator.cpp | 4 +- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 140 ++++++++++----------- .../pipeline/exec/spill_sort_source_operator.cpp | 3 +- be/src/pipeline/pipeline_x/dependency.cpp | 1 - be/src/pipeline/pipeline_x/dependency.h | 6 +- be/src/vec/spill/spill_stream.cpp | 64 ++-------- be/src/vec/spill/spill_stream.h | 10 +- be/src/vec/spill/spill_stream_manager.cpp | 25 +--- be/src/vec/spill/spill_stream_manager.h | 10 +- 15 files changed, 120 insertions(+), 222 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index aa50787d5b5..85d17c08e74 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1164,10 +1164,15 @@ DEFINE_String(spill_storage_root_path, ""); DEFINE_String(spill_storage_limit, "20%"); // 20% DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s DEFINE_mInt32(spill_gc_file_count, "2000"); -DEFINE_Int32(spill_io_thread_pool_per_disk_thread_num, "2"); -DEFINE_Int32(spill_io_thread_pool_queue_size, "1024"); -DEFINE_Int32(spill_async_task_thread_pool_thread_num, "2"); -DEFINE_Int32(spill_async_task_thread_pool_queue_size, "1024"); +DEFINE_Int32(spill_io_thread_pool_thread_num, "-1"); +DEFINE_Validator(spill_io_thread_pool_thread_num, [](const int config) -> bool { + if (config == -1) { + CpuInfo::init(); + spill_io_thread_pool_thread_num = std::max(48, CpuInfo::num_cores() * 2); + } + return true; +}); +DEFINE_Int32(spill_io_thread_pool_queue_size, "102400"); DEFINE_mBool(check_segment_when_build_rowset_meta, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 1e2f71c452b..d2f95653f1e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1253,10 +1253,8 @@ DECLARE_String(spill_storage_root_path); DECLARE_String(spill_storage_limit); DECLARE_mInt32(spill_gc_interval_ms); DECLARE_mInt32(spill_gc_file_count); -DECLARE_Int32(spill_io_thread_pool_per_disk_thread_num); +DECLARE_Int32(spill_io_thread_pool_thread_num); DECLARE_Int32(spill_io_thread_pool_queue_size); -DECLARE_Int32(spill_async_task_thread_pool_thread_num); -DECLARE_Int32(spill_async_task_thread_pool_queue_size); DECLARE_mBool(check_segment_when_build_rowset_meta); diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 55a0650dc1f..78079a0ddf8 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -253,7 +253,7 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { MonotonicStopWatch submit_timer; submit_timer.start(); - status = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func( + status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( [this, &parent, state, query_id, execution_context, submit_timer] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 7ec582905d0..1755cd866f2 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -83,7 +83,7 @@ public: for (int i = 0; i < Base::_shared_state->partition_count && !state->is_cancelled(); ++i) { if (spill_infos[i].keys_.size() >= spill_batch_rows) { - status = _async_spill_partition_and_wait( + status = _spill_partition( state, context, Base::_shared_state->spill_partitions[i], spill_infos[i].keys_, spill_infos[i].values_, nullptr, false); RETURN_IF_ERROR(status); @@ -98,13 +98,13 @@ public: auto spill_null_key_data = (hash_null_key_data && i == Base::_shared_state->partition_count - 1); if (spill_infos[i].keys_.size() > 0 || spill_null_key_data) { - status = _async_spill_partition_and_wait( - state, context, Base::_shared_state->spill_partitions[i], - spill_infos[i].keys_, spill_infos[i].values_, - spill_null_key_data ? hash_table.template get_null_key_data< - vectorized::AggregateDataPtr>() - : nullptr, - true); + status = _spill_partition(state, context, Base::_shared_state->spill_partitions[i], + spill_infos[i].keys_, spill_infos[i].values_, + spill_null_key_data + ? hash_table.template get_null_key_data< + vectorized::AggregateDataPtr>() + : nullptr, + true); RETURN_IF_ERROR(status); } } @@ -120,12 +120,10 @@ public: } template <typename HashTableCtxType, typename KeyType> - Status _async_spill_partition_and_wait(RuntimeState* state, HashTableCtxType& context, - AggSpillPartitionSPtr& spill_partition, - std::vector<KeyType>& keys, - std::vector<vectorized::AggregateDataPtr>& values, - const vectorized::AggregateDataPtr null_key_data, - bool is_last) { + Status _spill_partition(RuntimeState* state, HashTableCtxType& context, + AggSpillPartitionSPtr& spill_partition, std::vector<KeyType>& keys, + std::vector<vectorized::AggregateDataPtr>& values, + const vectorized::AggregateDataPtr null_key_data, bool is_last) { vectorized::SpillStreamSPtr spill_stream; auto status = spill_partition->get_spill_stream(state, Base::_parent->node_id(), Base::profile(), spill_stream); @@ -148,27 +146,15 @@ public: keys.clear(); values.clear(); } - status = spill_stream->prepare_spill(); RETURN_IF_ERROR(status); - status = ExecEnv::GetInstance() - ->spill_stream_mgr() - ->get_spill_io_thread_pool(spill_stream->get_spill_root_dir()) - ->submit_func([this, state, &spill_stream] { - (void)state; // avoid ut compile error - SCOPED_ATTACH_TASK(state); - SCOPED_TIMER(_spill_write_disk_timer); - Status status; - Defer defer {[&]() { spill_stream->end_spill(status); }}; - status = spill_stream->spill_block(state, block_, false); - return status; - }); - if (!status.ok()) { - spill_stream->end_spill(status); + { + SCOPED_TIMER(_spill_write_disk_timer); + status = spill_stream->spill_block(state, block_, false); } RETURN_IF_ERROR(status); - status = spill_partition->wait_spill(state); + status = spill_partition->flush_if_full(); _reset_tmp_data(); return status; } diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index a5753ef7654..ff4795f2079 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -211,7 +211,7 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime submit_timer.start(); RETURN_IF_ERROR( - ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func( + ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( [this, state, query_id, execution_context, submit_timer] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 2f766511984..0b52e41df28 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -175,8 +175,7 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state _spill_write_wait_io_timer); } - auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool( - build_spilling_stream->get_spill_root_dir()); + auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); MonotonicStopWatch submit_timer; @@ -230,8 +229,7 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat _spill_write_wait_io_timer); } - auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool( - spilling_stream->get_spill_root_dir()); + auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); auto& blocks = _probe_blocks[partition_index]; auto& partitioned_block = _partitioned_blocks[partition_index]; @@ -296,7 +294,6 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_index) { auto& build_spilling_stream = _shared_state->spilled_streams[partition_index]; if (build_spilling_stream) { - build_spilling_stream->end_spill(Status::OK()); RETURN_IF_ERROR(build_spilling_stream->spill_eof()); build_spilling_stream->set_read_counters(_spill_read_data_time, _spill_deserialize_time, _spill_read_bytes, _spill_read_wait_io_timer); @@ -305,7 +302,6 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in auto& probe_spilling_stream = _probe_spilling_streams[partition_index]; if (probe_spilling_stream) { - probe_spilling_stream->end_spill(Status::OK()); RETURN_IF_ERROR(probe_spilling_stream->spill_eof()); probe_spilling_stream->set_read_counters(_spill_read_data_time, _spill_deserialize_time, _spill_read_bytes, _spill_read_wait_io_timer); @@ -387,7 +383,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti _dependency->set_ready(); }; - auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool(); + auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); has_data = true; _dependency->block(); @@ -453,7 +449,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti _dependency->set_ready(); }; - auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool(); + auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); DCHECK(spill_io_pool != nullptr); _dependency->block(); has_data = true; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index b8454c19bf3..ade9096388b 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -219,7 +219,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta _dependency->set_ready(); }; - auto* thread_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool(); + auto* thread_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); return thread_pool->submit_func(spill_func); } @@ -247,7 +247,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { DCHECK(spilling_stream != nullptr); auto* spill_io_pool = - ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool(); + ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); DCHECK(spill_io_pool != nullptr); auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index f5e74826bb6..af505099e82 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -233,80 +233,74 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { MonotonicStopWatch submit_timer; submit_timer.start(); - status = ExecEnv::GetInstance() - ->spill_stream_mgr() - ->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir()) - ->submit_func([this, state, query_id, &parent, execution_context, - submit_timer] { - auto execution_context_lock = execution_context.lock(); - if (!execution_context_lock) { - LOG(INFO) << "query " << print_id(query_id) - << " execution_context released, maybe query was cancelled."; - return Status::OK(); - } - - _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); - SCOPED_ATTACH_TASK(state); - Defer defer {[&]() { - if (!_shared_state->sink_status.ok() || state->is_cancelled()) { - if (!_shared_state->sink_status.ok()) { - LOG(WARNING) << "query " << print_id(query_id) << " sort node " - << _parent->id() << " revoke memory error: " - << _shared_state->sink_status; - } - _shared_state->close(); - } else { - VLOG_DEBUG << "query " << print_id(query_id) << " sort node " - << _parent->id() << " revoke memory finish"; - } - - _spilling_stream->end_spill(_shared_state->sink_status); - if (!_shared_state->sink_status.ok()) { - _shared_state->close(); - } - - _spilling_stream.reset(); - if (_eos) { - _dependency->set_ready_to_read(); - _finish_dependency->set_ready(); - } else { - _dependency->Dependency::set_ready(); - } - }}; - - _shared_state->sink_status = parent._sort_sink_operator->prepare_for_spill( - _runtime_state.get()); - RETURN_IF_ERROR(_shared_state->sink_status); - - auto* sink_local_state = _runtime_state->get_sink_local_state(); - update_profile(sink_local_state->profile()); - - bool eos = false; - vectorized::Block block; - while (!eos && !state->is_cancelled()) { - { - SCOPED_TIMER(_spill_merge_sort_timer); - _shared_state->sink_status = - parent._sort_sink_operator->merge_sort_read_for_spill( - _runtime_state.get(), &block, - _shared_state->spill_block_batch_row_count, &eos); - } - RETURN_IF_ERROR(_shared_state->sink_status); - { - SCOPED_TIMER(Base::_spill_timer); - _shared_state->sink_status = - _spilling_stream->spill_block(state, block, eos); - } - RETURN_IF_ERROR(_shared_state->sink_status); - block.clear_column_data(); - } - parent._sort_sink_operator->reset(_runtime_state.get()); - - return Status::OK(); - }); - if (!status.ok()) { - _spilling_stream->end_spill(status); + status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( + [this, state, query_id, &parent, execution_context, submit_timer] { + auto execution_context_lock = execution_context.lock(); + if (!execution_context_lock) { + LOG(INFO) << "query " << print_id(query_id) + << " execution_context released, maybe query was cancelled."; + return Status::OK(); + } + _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + SCOPED_ATTACH_TASK(state); + Defer defer {[&]() { + if (!_shared_state->sink_status.ok() || state->is_cancelled()) { + if (!_shared_state->sink_status.ok()) { + LOG(WARNING) << "query " << print_id(query_id) << " sort node " + << _parent->id() + << " revoke memory error: " << _shared_state->sink_status; + } + _shared_state->close(); + } else { + VLOG_DEBUG << "query " << print_id(query_id) << " sort node " + << _parent->id() << " revoke memory finish"; + } + + if (!_shared_state->sink_status.ok()) { + _shared_state->close(); + } + + _spilling_stream.reset(); + if (_eos) { + _dependency->set_ready_to_read(); + _finish_dependency->set_ready(); + } else { + _dependency->Dependency::set_ready(); + } + }}; + + _shared_state->sink_status = + parent._sort_sink_operator->prepare_for_spill(_runtime_state.get()); + RETURN_IF_ERROR(_shared_state->sink_status); + + auto* sink_local_state = _runtime_state->get_sink_local_state(); + update_profile(sink_local_state->profile()); + + bool eos = false; + vectorized::Block block; + while (!eos && !state->is_cancelled()) { + { + SCOPED_TIMER(_spill_merge_sort_timer); + _shared_state->sink_status = + parent._sort_sink_operator->merge_sort_read_for_spill( + _runtime_state.get(), &block, + _shared_state->spill_block_batch_row_count, &eos); + } + RETURN_IF_ERROR(_shared_state->sink_status); + { + SCOPED_TIMER(Base::_spill_timer); + _shared_state->sink_status = + _spilling_stream->spill_block(state, block, eos); + } + RETURN_IF_ERROR(_shared_state->sink_status); + block.clear_column_data(); + } + parent._sort_sink_operator->reset(_runtime_state.get()); + + return Status::OK(); + }); + if (!status.ok()) { if (!_eos) { Base::_dependency->Dependency::set_ready(); } diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index 107868f968d..115f40a3636 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -148,7 +148,6 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat _status = tmp_stream->prepare_spill(); RETURN_IF_ERROR(_status); - Defer defer {[&]() { tmp_stream->end_spill(_status); }}; _shared_state->sorted_streams.emplace_back(tmp_stream); bool eos = false; @@ -173,7 +172,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat } return Status::OK(); }; - return ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func( + return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func( spill_func); } diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 6dccf15dbd8..ec890dac7f1 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -231,7 +231,6 @@ Status AggSpillPartition::get_spill_stream(RuntimeState* state, int node_id, } void AggSpillPartition::close() { if (spilling_stream_) { - (void)spilling_stream_->wait_spill(); spilling_stream_.reset(); } for (auto& stream : spill_streams_) { diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index ebd44bec4d2..d663b0aae8b 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -445,11 +445,9 @@ struct AggSpillPartition { Status get_spill_stream(RuntimeState* state, int node_id, RuntimeProfile* profile, vectorized::SpillStreamSPtr& spilling_stream); - // wait for current bock spilling to finish - Status wait_spill(RuntimeState* state) { + Status flush_if_full() { DCHECK(spilling_stream_); - auto status = spilling_stream_->wait_spill(); - RETURN_IF_ERROR(status); + Status status; // avoid small spill files if (spilling_stream_->get_written_bytes() >= AGG_SPILL_FILE_SIZE) { status = spilling_stream_->spill_eof(); diff --git a/be/src/vec/spill/spill_stream.cpp b/be/src/vec/spill/spill_stream.cpp index ed7be9a0b28..e4631f1e1cd 100644 --- a/be/src/vec/spill/spill_stream.cpp +++ b/be/src/vec/spill/spill_stream.cpp @@ -42,10 +42,7 @@ SpillStream::SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* d spill_dir_(std::move(spill_dir)), batch_rows_(batch_rows), batch_bytes_(batch_bytes), - profile_(profile) { - io_thread_pool_ = - ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(data_dir->path()); -} + profile_(profile) {} SpillStream::~SpillStream() { bool exists = false; @@ -70,14 +67,6 @@ void SpillStream::close() { } VLOG_ROW << "closing: " << stream_id_; closed_ = true; - if (spill_promise_) { - spill_future_.wait(); - spill_promise_.reset(); - } - if (read_promise_) { - read_future_.wait(); - read_promise_.reset(); - } if (writer_) { (void)writer_->close(); @@ -97,25 +86,7 @@ const std::string& SpillStream::get_spill_root_dir() const { return data_dir_->path(); } Status SpillStream::prepare_spill() { - DCHECK(!spill_promise_); - RETURN_IF_ERROR(writer_->open()); - - spill_promise_ = std::make_unique<std::promise<Status>>(); - spill_future_ = spill_promise_->get_future(); - return Status::OK(); -} -void SpillStream::end_spill(const Status& status) { - spill_promise_->set_value(status); -} - -Status SpillStream::wait_spill() { - if (spill_promise_) { - SCOPED_TIMER(write_wait_io_timer_); - auto status = spill_future_.get(); - spill_promise_.reset(); - return status; - } - return Status::OK(); + return writer_->open(); } Status SpillStream::spill_block(RuntimeState* state, const Block& block, bool eof) { @@ -135,34 +106,13 @@ Status SpillStream::spill_eof() { } Status SpillStream::read_next_block_sync(Block* block, bool* eos) { - DCHECK(!read_promise_); DCHECK(reader_ != nullptr); - Status status; - read_promise_ = std::make_unique<std::promise<Status>>(); - read_future_ = read_promise_->get_future(); - // use thread pool to limit concurrent io tasks - status = io_thread_pool_->submit_func([this, block, eos] { - SCOPED_ATTACH_TASK(state_); - Status st; - Defer defer {[&]() { read_promise_->set_value(st); }}; - st = reader_->open(); - if (!st.ok()) { - return; - } - st = reader_->read(block, eos); - }); - if (!status.ok()) { - LOG(WARNING) << "read spill data failed: " << status; - read_promise_.reset(); - return status; - } + DCHECK(!_is_reading); + _is_reading = true; + Defer defer([this] { _is_reading = false; }); - { - SCOPED_TIMER(read_wait_io_timer_); - status = read_future_.get(); - } - read_promise_.reset(); - return status; + RETURN_IF_ERROR(reader_->open()); + return reader_->read(block, eos); } } // namespace doris::vectorized diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h index 68abfa9aaf7..638942d1af1 100644 --- a/be/src/vec/spill/spill_stream.h +++ b/be/src/vec/spill/spill_stream.h @@ -55,12 +55,8 @@ public: Status spill_block(RuntimeState* state, const Block& block, bool eof); - void end_spill(const Status& status); - Status spill_eof(); - Status wait_spill(); - Status read_next_block_sync(Block* block, bool* eos); void set_write_counters(RuntimeProfile::Counter* serialize_timer, @@ -91,7 +87,6 @@ private: void close(); RuntimeState* state_ = nullptr; - ThreadPool* io_thread_pool_; int64_t stream_id_; std::atomic_bool closed_ = false; SpillDataDir* data_dir_ = nullptr; @@ -99,10 +94,7 @@ private: size_t batch_rows_; size_t batch_bytes_; - std::unique_ptr<std::promise<Status>> spill_promise_; - std::future<Status> spill_future_; - std::unique_ptr<std::promise<Status>> read_promise_; - std::future<Status> read_future_; + std::atomic_bool _is_reading = false; SpillWriterUPtr writer_; SpillReaderUPtr reader_; diff --git a/be/src/vec/spill/spill_stream_manager.cpp b/be/src/vec/spill/spill_stream_manager.cpp index 05a2531c466..2042555e49c 100644 --- a/be/src/vec/spill/spill_stream_manager.cpp +++ b/be/src/vec/spill/spill_stream_manager.cpp @@ -50,11 +50,6 @@ Status SpillStreamManager::init() { LOG(INFO) << "init spill stream manager"; RETURN_IF_ERROR(_init_spill_store_map()); - int spill_io_thread_count = config::spill_io_thread_pool_per_disk_thread_num; - if (spill_io_thread_count <= 0) { - spill_io_thread_count = 2; - } - int pool_idx = 0; for (const auto& [path, store] : _spill_store_map) { auto gc_dir_root_dir = fmt::format("{}/{}", path, SPILL_GC_DIR_PREFIX); bool exists = true; @@ -85,20 +80,12 @@ Status SpillStreamManager::init() { } } store->update_spill_data_usage(spill_data_size); - - std::unique_ptr<ThreadPool> io_pool; - static_cast<void>(ThreadPoolBuilder(fmt::format("SpillIOThreadPool-{}", pool_idx++)) - .set_min_threads(spill_io_thread_count) - .set_max_threads(spill_io_thread_count) - .set_max_queue_size(config::spill_io_thread_pool_queue_size) - .build(&io_pool)); - path_to_io_thread_pool_[path] = std::move(io_pool); } - static_cast<void>(ThreadPoolBuilder("SpillAsyncTaskThreadPool") - .set_min_threads(config::spill_async_task_thread_pool_thread_num) - .set_max_threads(config::spill_async_task_thread_pool_thread_num) - .set_max_queue_size(config::spill_async_task_thread_pool_queue_size) - .build(&async_task_thread_pool_)); + static_cast<void>(ThreadPoolBuilder("SpillIOThreadPool") + .set_min_threads(config::spill_io_thread_pool_thread_num) + .set_max_threads(config::spill_io_thread_pool_thread_num) + .set_max_queue_size(config::spill_io_thread_pool_queue_size) + .build(&_spill_io_thread_pool)); RETURN_IF_ERROR(Thread::create( "Spill", "spill_gc_thread", [this]() { this->_spill_gc_thread_callback(); }, @@ -274,7 +261,7 @@ void SpillStreamManager::gc(int64_t max_file_count) { } void SpillStreamManager::async_cleanup_query(TUniqueId query_id) { - (void)get_async_task_thread_pool()->submit_func([this, query_id] { + (void)get_spill_io_thread_pool()->submit_func([this, query_id] { for (auto& [_, store] : _spill_store_map) { std::string query_spill_dir = fmt::format("{}/{}/{}", store->path(), SPILL_DIR_PREFIX, print_id(query_id)); diff --git a/be/src/vec/spill/spill_stream_manager.h b/be/src/vec/spill/spill_stream_manager.h index 36062ce0b46..298af77afcc 100644 --- a/be/src/vec/spill/spill_stream_manager.h +++ b/be/src/vec/spill/spill_stream_manager.h @@ -112,12 +112,7 @@ public: void gc(int64_t max_file_count); - ThreadPool* get_spill_io_thread_pool(const std::string& path) const { - const auto it = path_to_io_thread_pool_.find(path); - DCHECK(it != path_to_io_thread_pool_.end()); - return it->second.get(); - } - ThreadPool* get_async_task_thread_pool() const { return async_task_thread_pool_.get(); } + ThreadPool* get_spill_io_thread_pool() const { return _spill_io_thread_pool.get(); } private: Status _init_spill_store_map(); @@ -127,8 +122,7 @@ private: std::unordered_map<std::string, std::unique_ptr<SpillDataDir>> _spill_store_map; CountDownLatch _stop_background_threads_latch; - std::unique_ptr<ThreadPool> async_task_thread_pool_; - std::unordered_map<std::string, std::unique_ptr<ThreadPool>> path_to_io_thread_pool_; + std::unique_ptr<ThreadPool> _spill_io_thread_pool; scoped_refptr<Thread> _spill_gc_thread; std::atomic_uint64_t id_ = 0; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org