This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 93eca0c2ff0 improve spill stats 93eca0c2ff0 is described below commit 93eca0c2ff0389d55a4da7da465d6d70ea5161fc Author: jacktengg <tengjianp...@selectdb.com> AuthorDate: Wed Nov 13 09:47:24 2024 +0800 improve spill stats --- be/src/pipeline/exec/operator.h | 22 ---------------------- be/src/runtime/query_statistics.h | 10 ++++++---- be/src/vec/spill/spill_reader.cpp | 12 ++++++++++++ be/src/vec/spill/spill_reader.h | 10 ++++++++-- be/src/vec/spill/spill_stream.cpp | 10 +++++++--- be/src/vec/spill/spill_writer.cpp | 6 ++++++ be/src/vec/spill/spill_writer.h | 12 +++++++++--- 7 files changed, 48 insertions(+), 34 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index c0ef6d27af0..e7c668d2cdb 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -301,17 +301,6 @@ public: return Status::OK(); } - Status close(RuntimeState* state) override { - if (Base::_query_statistics) { - auto* write_file_bytes = Base::profile()->get_counter("SpillWriteFileBytes"); - auto* read_file_bytes = Base::profile()->get_counter("SpillReadFileBytes"); - Base::_query_statistics->add_spill_bytes( - write_file_bytes ? write_file_bytes->value() : 0, - read_file_bytes ? read_file_bytes->value() : 0); - } - return Base::close(state); - } - void init_spill_write_counters() { _spill_write_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteTime", 1); @@ -741,17 +730,6 @@ public: return Status::OK(); } - Status close(RuntimeState* state, Status exec_status) override { - if (Base::_query_statistics) { - auto* write_file_bytes = Base::profile()->get_counter("SpillWriteFileBytes"); - auto* read_file_bytes = Base::profile()->get_counter("SpillReadFileBytes"); - Base::_query_statistics->add_spill_bytes( - write_file_bytes ? write_file_bytes->value() : 0, - read_file_bytes ? read_file_bytes->value() : 0); - } - return Base::close(state, exec_status); - } - std::vector<Dependency*> dependencies() const override { auto dependencies = Base::dependencies(); return dependencies; diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h index bffea2b1d2a..0b0174172ff 100644 --- a/be/src/runtime/query_statistics.h +++ b/be/src/runtime/query_statistics.h @@ -82,10 +82,12 @@ public: current_used_memory_bytes = current_used_memory; } - void add_spill_bytes(int64_t spill_write_bytes_to_local_storage, - int64_t spill_read_bytes_from_local_storage) { - _spill_write_bytes_to_local_storage += spill_write_bytes_to_local_storage; - _spill_read_bytes_from_local_storage += spill_read_bytes_from_local_storage; + void add_spill_write_bytes_to_local_storage(int64_t bytes) { + _spill_write_bytes_to_local_storage += bytes; + } + + void add_spill_read_bytes_from_local_storage(int64_t bytes) { + _spill_read_bytes_from_local_storage += bytes; } void to_pb(PQueryStatistics* statistics); diff --git a/be/src/vec/spill/spill_reader.cpp b/be/src/vec/spill/spill_reader.cpp index 004de38f354..c947081fcaf 100644 --- a/be/src/vec/spill/spill_reader.cpp +++ b/be/src/vec/spill/spill_reader.cpp @@ -55,6 +55,9 @@ Status SpillReader::open() { RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t), result, &bytes_read)); DCHECK(bytes_read == 8); // max_sub_block_size, block count COUNTER_UPDATE(_read_file_size, bytes_read); + if (_query_statistics) { + _query_statistics->add_spill_read_bytes_from_local_storage(bytes_read); + } // read max sub block size bytes_read = 0; @@ -62,6 +65,9 @@ Status SpillReader::open() { RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t) * 2, result, &bytes_read)); DCHECK(bytes_read == 8); // max_sub_block_size, block count COUNTER_UPDATE(_read_file_size, bytes_read); + if (_query_statistics) { + _query_statistics->add_spill_read_bytes_from_local_storage(bytes_read); + } size_t buff_size = std::max(block_count_ * sizeof(size_t), max_sub_block_size_); try { @@ -80,6 +86,9 @@ Status SpillReader::open() { RETURN_IF_ERROR(file_reader_->read_at(read_offset, result, &bytes_read)); DCHECK(bytes_read == block_count_ * sizeof(size_t)); COUNTER_UPDATE(_read_file_size, bytes_read); + if (_query_statistics) { + _query_statistics->add_spill_read_bytes_from_local_storage(bytes_read); + } block_start_offsets_.resize(block_count_ + 1); for (size_t i = 0; i < block_count_; ++i) { @@ -118,6 +127,9 @@ Status SpillReader::read(Block* block, bool* eos) { if (bytes_read > 0) { COUNTER_UPDATE(_read_file_size, bytes_read); + if (_query_statistics) { + _query_statistics->add_spill_read_bytes_from_local_storage(bytes_read); + } COUNTER_UPDATE(_read_block_count, 1); { SCOPED_TIMER(_deserialize_timer); diff --git a/be/src/vec/spill/spill_reader.h b/be/src/vec/spill/spill_reader.h index 1096e529bf3..da975bdb605 100644 --- a/be/src/vec/spill/spill_reader.h +++ b/be/src/vec/spill/spill_reader.h @@ -25,6 +25,7 @@ #include "common/status.h" #include "io/fs/file_reader_writer_fwd.h" +#include "runtime/query_statistics.h" #include "util/runtime_profile.h" namespace doris::vectorized { @@ -32,8 +33,11 @@ namespace doris::vectorized { class Block; class SpillReader { public: - SpillReader(int64_t stream_id, std::string file_path) - : stream_id_(stream_id), file_path_(std::move(file_path)) {} + SpillReader(std::shared_ptr<doris::QueryStatistics> query_statistics, int64_t stream_id, + std::string file_path) + : stream_id_(stream_id), + file_path_(std::move(file_path)), + _query_statistics(std::move(query_statistics)) {} ~SpillReader() { (void)close(); } @@ -81,6 +85,8 @@ private: RuntimeProfile::Counter* _read_file_size = nullptr; RuntimeProfile::Counter* _read_rows_count = nullptr; RuntimeProfile::Counter* _read_file_count = nullptr; + + std::shared_ptr<doris::QueryStatistics> _query_statistics = nullptr; }; using SpillReaderUPtr = std::unique_ptr<SpillReader>; diff --git a/be/src/vec/spill/spill_stream.cpp b/be/src/vec/spill/spill_stream.cpp index f14e5f60974..3e5b93a21d7 100644 --- a/be/src/vec/spill/spill_stream.cpp +++ b/be/src/vec/spill/spill_stream.cpp @@ -25,6 +25,7 @@ #include "io/fs/local_file_system.h" #include "runtime/exec_env.h" +#include "runtime/query_context.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "util/debug_points.h" @@ -95,11 +96,14 @@ void SpillStream::gc() { } Status SpillStream::prepare() { - writer_ = - std::make_unique<SpillWriter>(profile_, stream_id_, batch_rows_, data_dir_, spill_dir_); + writer_ = std::make_unique<SpillWriter>( + state_->get_query_ctx()->get_mem_tracker()->get_query_statistics(), profile_, + stream_id_, batch_rows_, data_dir_, spill_dir_); _set_write_counters(profile_); - reader_ = std::make_unique<SpillReader>(stream_id_, writer_->get_file_path()); + reader_ = std::make_unique<SpillReader>( + state_->get_query_ctx()->get_mem_tracker()->get_query_statistics(), stream_id_, + writer_->get_file_path()); DBUG_EXECUTE_IF("fault_inject::spill_stream::prepare_spill", { return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream prepare_spill failed"); diff --git a/be/src/vec/spill/spill_writer.cpp b/be/src/vec/spill/spill_writer.cpp index f6e16518043..0af0cb59a7d 100644 --- a/be/src/vec/spill/spill_writer.cpp +++ b/be/src/vec/spill/spill_writer.cpp @@ -52,6 +52,9 @@ Status SpillWriter::close() { total_written_bytes_ += meta_.size(); COUNTER_UPDATE(_write_file_total_size, meta_.size()); + if (_query_statistics) { + _query_statistics->add_spill_write_bytes_to_local_storage(meta_.size()); + } if (_write_file_current_size) { COUNTER_UPDATE(_write_file_current_size, meta_.size()); } @@ -149,6 +152,9 @@ Status SpillWriter::_write_internal(const Block& block, size_t& written_bytes) { meta_.append((const char*)&total_written_bytes_, sizeof(size_t)); COUNTER_UPDATE(_write_file_total_size, buff_size); + if (_query_statistics) { + _query_statistics->add_spill_write_bytes_to_local_storage(buff_size); + } if (_write_file_current_size) { COUNTER_UPDATE(_write_file_current_size, buff_size); } diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h index 05afebf5dcb..36a40e1e055 100644 --- a/be/src/vec/spill/spill_writer.h +++ b/be/src/vec/spill/spill_writer.h @@ -22,6 +22,7 @@ #include <string> #include "io/fs/file_writer.h" +#include "runtime/query_statistics.h" #include "util/runtime_profile.h" #include "vec/core/block.h" namespace doris { @@ -32,9 +33,12 @@ namespace vectorized { class SpillDataDir; class SpillWriter { public: - SpillWriter(RuntimeProfile* profile, int64_t id, size_t batch_size, SpillDataDir* data_dir, - const std::string& dir) - : data_dir_(data_dir), stream_id_(id), batch_size_(batch_size) { + SpillWriter(std::shared_ptr<doris::QueryStatistics> query_statistics, RuntimeProfile* profile, + int64_t id, size_t batch_size, SpillDataDir* data_dir, const std::string& dir) + : data_dir_(data_dir), + stream_id_(id), + batch_size_(batch_size), + _query_statistics(std::move(query_statistics)) { // Directory path format specified in SpillStreamManager::register_spill_stream: // storage_root/spill/query_id/partitioned_hash_join-node_id-task_id-stream_id/0 file_path_ = dir + "/0"; @@ -92,6 +96,8 @@ private: RuntimeProfile::Counter* _write_rows_counter = nullptr; RuntimeProfile::Counter* _memory_used_counter = nullptr; RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr; + + std::shared_ptr<doris::QueryStatistics> _query_statistics = nullptr; }; using SpillWriterUPtr = std::unique_ptr<SpillWriter>; } // namespace vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org