This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch 2.1-tmp in repository https://gitbox.apache.org/repos/asf/doris.git
commit 6c5dd820c0aebad9c75b7455d1dfb482eae8c488 Author: TengJianPing <18241664+jackte...@users.noreply.github.com> AuthorDate: Tue Apr 2 22:31:46 2024 +0800 [improvement](spill) improve spill timers (#33156) --- .../pipeline/exec/partitioned_aggregation_sink_operator.h | 3 ++- .../exec/partitioned_aggregation_source_operator.cpp | 6 +++--- .../pipeline/exec/partitioned_hash_join_probe_operator.cpp | 14 ++++++++++---- .../pipeline/exec/partitioned_hash_join_probe_operator.h | 2 ++ .../pipeline/exec/partitioned_hash_join_sink_operator.cpp | 4 +++- be/src/pipeline/exec/partitioned_hash_join_sink_operator.h | 1 + be/src/pipeline/exec/spill_sort_sink_operator.cpp | 6 +++--- be/src/pipeline/exec/spill_sort_source_operator.cpp | 5 +++-- be/src/pipeline/pipeline_x/operator.h | 12 ++++++++++++ be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 4 ++-- be/src/vec/spill/spill_stream.cpp | 6 +++++- be/src/vec/spill/spill_stream.h | 10 ++++++++-- 12 files changed, 54 insertions(+), 19 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 542046556ec..d63f272092b 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -131,7 +131,8 @@ public: RETURN_IF_ERROR(status); spill_stream->set_write_counters(Base::_spill_serialize_block_timer, Base::_spill_block_count, Base::_spill_data_size, - Base::_spill_write_disk_timer); + Base::_spill_write_disk_timer, + Base::_spill_write_wait_io_timer); status = to_block(context, keys, values, null_key_data); RETURN_IF_ERROR(status); diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index c328598ac44..5db80788f41 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -227,9 +227,9 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime !_shared_state->spill_partitions.empty()) { for (auto& stream : _shared_state->spill_partitions[0]->spill_streams_) { - stream->set_read_counters(Base::_spill_read_data_time, - Base::_spill_deserialize_time, - Base::_spill_read_bytes); + stream->set_read_counters( + Base::_spill_read_data_time, Base::_spill_deserialize_time, + Base::_spill_read_bytes, Base::_spill_read_wait_io_timer); vectorized::Block block; bool eos = false; while (!eos) { diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 8f859820252..c23e12c3705 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -73,6 +73,10 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI "SpillAndPartition", 1); _spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadDataSize", TUnit::BYTES, "SpillAndPartition", 1); + _spill_write_wait_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", + "SpillAndPartition", 1); + _spill_read_wait_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadWaitIOTime", + "SpillAndPartition", 1); // Build phase _build_phase_label = ADD_LABEL_COUNTER(profile(), "BuildPhase"); @@ -175,7 +179,8 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state std::numeric_limits<size_t>::max(), _runtime_profile.get())); RETURN_IF_ERROR(build_spilling_stream->prepare_spill()); build_spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count, - _spill_data_size, _spill_write_disk_timer); + _spill_data_size, _spill_write_disk_timer, + _spill_write_wait_io_timer); } auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool( @@ -225,7 +230,8 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat _runtime_profile.get())); RETURN_IF_ERROR(spilling_stream->prepare_spill()); spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count, - _spill_data_size, _spill_write_disk_timer); + _spill_data_size, _spill_write_disk_timer, + _spill_write_wait_io_timer); } auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool( @@ -294,7 +300,7 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in build_spilling_stream->end_spill(Status::OK()); RETURN_IF_ERROR(build_spilling_stream->spill_eof()); build_spilling_stream->set_read_counters(_spill_read_data_time, _spill_deserialize_time, - _spill_read_bytes); + _spill_read_bytes, _spill_read_wait_io_timer); } auto& probe_spilling_stream = _probe_spilling_streams[partition_index]; @@ -303,7 +309,7 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in probe_spilling_stream->end_spill(Status::OK()); RETURN_IF_ERROR(probe_spilling_stream->spill_eof()); probe_spilling_stream->set_read_counters(_spill_read_data_time, _spill_deserialize_time, - _spill_read_bytes); + _spill_read_bytes, _spill_read_wait_io_timer); } return Status::OK(); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 7337017fde6..8270817758d 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -112,6 +112,8 @@ private: RuntimeProfile::Counter* _spill_write_disk_timer = nullptr; RuntimeProfile::Counter* _spill_data_size = nullptr; RuntimeProfile::Counter* _spill_block_count = nullptr; + RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr; + RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr; RuntimeProfile::Counter* _build_phase_label = nullptr; RuntimeProfile::Counter* _build_rows_counter = nullptr; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index dd119ade14b..f38354d5de2 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -40,6 +40,7 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state, _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillWriteDiskTime", 1); _spill_data_size = ADD_COUNTER_WITH_LEVEL(profile(), "SpillWriteDataSize", TUnit::BYTES, 1); _spill_block_count = ADD_COUNTER_WITH_LEVEL(profile(), "SpillWriteBlockCount", TUnit::UNIT, 1); + _spill_write_wait_io_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillWriteWaitIOTime", 1); return _partitioner->prepare(state, p._child_x->row_desc()); } @@ -80,7 +81,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { std::numeric_limits<size_t>::max(), _profile)); RETURN_IF_ERROR(spilling_stream->prepare_spill()); spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count, - _spill_data_size, _spill_write_disk_timer); + _spill_data_size, _spill_write_disk_timer, + _spill_write_wait_io_timer); } auto* spill_io_pool = diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 4d25acd1b20..e364e225f66 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -80,6 +80,7 @@ protected: RuntimeProfile::Counter* _spill_write_disk_timer = nullptr; RuntimeProfile::Counter* _spill_data_size = nullptr; RuntimeProfile::Counter* _spill_block_count = nullptr; + RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr; }; class PartitionedHashJoinSinkOperatorX diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 9c3ae278ff6..662e195f3e5 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -210,9 +210,9 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, profile()); RETURN_IF_ERROR(status); - _spilling_stream->set_write_counters(Base::_spill_serialize_block_timer, - Base::_spill_block_count, Base::_spill_data_size, - Base::_spill_write_disk_timer); + _spilling_stream->set_write_counters( + Base::_spill_serialize_block_timer, Base::_spill_block_count, Base::_spill_data_size, + Base::_spill_write_disk_timer, Base::_spill_write_wait_io_timer); status = _spilling_stream->prepare_spill(); RETURN_IF_ERROR(status); diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index 6ae30a482f7..c021687e1df 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -134,7 +134,8 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat bool eos = false; tmp_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count, - _spill_data_size, _spill_write_disk_timer); + _spill_data_size, _spill_write_disk_timer, + _spill_write_wait_io_timer); while (!eos && !state->is_cancelled()) { merge_sorted_block.clear_column_data(); { @@ -170,7 +171,7 @@ Status SpillSortLocalState::_create_intermediate_merger( for (int i = 0; i < num_blocks && !_shared_state->sorted_streams.empty(); ++i) { auto stream = _shared_state->sorted_streams.front(); stream->set_read_counters(Base::_spill_read_data_time, Base::_spill_deserialize_time, - Base::_spill_read_bytes); + Base::_spill_read_bytes, Base::_spill_read_wait_io_timer); _current_merging_streams.emplace_back(stream); child_block_suppliers.emplace_back( std::bind(std::mem_fn(&vectorized::SpillStream::read_next_block_sync), stream.get(), diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 4ca8022d163..20fa46a5bf9 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -455,6 +455,10 @@ public: ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillDeserializeTime", "Spill", 1); _spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadDataSize", TUnit::BYTES, "Spill", 1); + _spill_write_wait_io_timer = + ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", "Spill", 1); + _spill_read_wait_io_timer = + ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadWaitIOTime", "Spill", 1); return Status::OK(); } @@ -463,6 +467,8 @@ public: RuntimeProfile::Counter* _spill_read_data_time; RuntimeProfile::Counter* _spill_deserialize_time; RuntimeProfile::Counter* _spill_read_bytes; + RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr; + RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr; }; class DataSinkOperatorXBase; @@ -770,6 +776,10 @@ public: TUnit::BYTES, "Spill", 1); _spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", TUnit::UNIT, "Spill", 1); + _spill_write_wait_io_timer = + ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", "Spill", 1); + _spill_read_wait_io_timer = + ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadWaitIOTime", "Spill", 1); return Status::OK(); } @@ -779,6 +789,8 @@ public: RuntimeProfile::Counter* _spill_write_disk_timer = nullptr; RuntimeProfile::Counter* _spill_data_size = nullptr; RuntimeProfile::Counter* _spill_block_count = nullptr; + RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr; + RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr; }; /** diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 9222c482381..4b85df05484 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -306,11 +306,12 @@ bool PipelineXTask::should_revoke_memory(RuntimeState* state, int64_t revocable_ LOG_ONCE(INFO) << "no workload group for query " << print_id(state->query_id()); return false; } + const auto min_revocable_mem_bytes = state->min_revocable_mem(); bool is_wg_mem_low_water_mark = false; bool is_wg_mem_high_water_mark = false; wg->check_mem_used(&is_wg_mem_low_water_mark, &is_wg_mem_high_water_mark); if (is_wg_mem_high_water_mark) { - if (revocable_mem_bytes > 0) { + if (revocable_mem_bytes > min_revocable_mem_bytes) { LOG_EVERY_N(INFO, 5) << "revoke memory, hight water mark"; return true; } @@ -331,7 +332,6 @@ bool PipelineXTask::should_revoke_memory(RuntimeState* state, int64_t revocable_ mem_limit_of_op = query_weighted_limit / big_memory_operator_num; } - const auto min_revocable_mem_bytes = state->min_revocable_mem(); LOG_EVERY_N(INFO, 5) << "revoke memory, low water mark, revocable_mem_bytes: " << PrettyPrinter::print_bytes(revocable_mem_bytes) << ", mem_limit_of_op: " << PrettyPrinter::print_bytes(mem_limit_of_op) diff --git a/be/src/vec/spill/spill_stream.cpp b/be/src/vec/spill/spill_stream.cpp index f245f8fa309..843a9fc9658 100644 --- a/be/src/vec/spill/spill_stream.cpp +++ b/be/src/vec/spill/spill_stream.cpp @@ -95,6 +95,7 @@ void SpillStream::end_spill(const Status& status) { Status SpillStream::wait_spill() { if (spill_promise_) { + SCOPED_TIMER(write_wait_io_timer_); auto status = spill_future_.get(); spill_promise_.reset(); return status; @@ -141,7 +142,10 @@ Status SpillStream::read_next_block_sync(Block* block, bool* eos) { return status; } - status = read_future_.get(); + { + SCOPED_TIMER(read_wait_io_timer_); + status = read_future_.get(); + } read_promise_.reset(); return status; } diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h index 4d53b439712..afec4734d8a 100644 --- a/be/src/vec/spill/spill_stream.h +++ b/be/src/vec/spill/spill_stream.h @@ -64,15 +64,19 @@ public: void set_write_counters(RuntimeProfile::Counter* serialize_timer, RuntimeProfile::Counter* write_block_counter, RuntimeProfile::Counter* write_bytes_counter, - RuntimeProfile::Counter* write_timer) { + RuntimeProfile::Counter* write_timer, + RuntimeProfile::Counter* wait_io_timer) { writer_->set_counters(serialize_timer, write_block_counter, write_bytes_counter, write_timer); + write_wait_io_timer_ = wait_io_timer; } void set_read_counters(RuntimeProfile::Counter* read_timer, RuntimeProfile::Counter* deserialize_timer, - RuntimeProfile::Counter* read_bytes) { + RuntimeProfile::Counter* read_bytes, + RuntimeProfile::Counter* wait_io_timer) { reader_->set_counters(read_timer, deserialize_timer, read_bytes); + read_wait_io_timer_ = wait_io_timer; } private: @@ -100,6 +104,8 @@ private: SpillReaderUPtr reader_; RuntimeProfile* profile_ = nullptr; + RuntimeProfile::Counter* write_wait_io_timer_ = nullptr; + RuntimeProfile::Counter* read_wait_io_timer_ = nullptr; }; using SpillStreamSPtr = std::shared_ptr<SpillStream>; } // namespace vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org