This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 93eca0c2ff0 improve spill stats
93eca0c2ff0 is described below

commit 93eca0c2ff0389d55a4da7da465d6d70ea5161fc
Author: jacktengg <tengjianp...@selectdb.com>
AuthorDate: Wed Nov 13 09:47:24 2024 +0800

    improve spill stats
---
 be/src/pipeline/exec/operator.h   | 22 ----------------------
 be/src/runtime/query_statistics.h | 10 ++++++----
 be/src/vec/spill/spill_reader.cpp | 12 ++++++++++++
 be/src/vec/spill/spill_reader.h   | 10 ++++++++--
 be/src/vec/spill/spill_stream.cpp | 10 +++++++---
 be/src/vec/spill/spill_writer.cpp |  6 ++++++
 be/src/vec/spill/spill_writer.h   | 12 +++++++++---
 7 files changed, 48 insertions(+), 34 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index c0ef6d27af0..e7c668d2cdb 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -301,17 +301,6 @@ public:
         return Status::OK();
     }
 
-    Status close(RuntimeState* state) override {
-        if (Base::_query_statistics) {
-            auto* write_file_bytes = 
Base::profile()->get_counter("SpillWriteFileBytes");
-            auto* read_file_bytes = 
Base::profile()->get_counter("SpillReadFileBytes");
-            Base::_query_statistics->add_spill_bytes(
-                    write_file_bytes ? write_file_bytes->value() : 0,
-                    read_file_bytes ? read_file_bytes->value() : 0);
-        }
-        return Base::close(state);
-    }
-
     void init_spill_write_counters() {
         _spill_write_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteTime", 1);
 
@@ -741,17 +730,6 @@ public:
         return Status::OK();
     }
 
-    Status close(RuntimeState* state, Status exec_status) override {
-        if (Base::_query_statistics) {
-            auto* write_file_bytes = 
Base::profile()->get_counter("SpillWriteFileBytes");
-            auto* read_file_bytes = 
Base::profile()->get_counter("SpillReadFileBytes");
-            Base::_query_statistics->add_spill_bytes(
-                    write_file_bytes ? write_file_bytes->value() : 0,
-                    read_file_bytes ? read_file_bytes->value() : 0);
-        }
-        return Base::close(state, exec_status);
-    }
-
     std::vector<Dependency*> dependencies() const override {
         auto dependencies = Base::dependencies();
         return dependencies;
diff --git a/be/src/runtime/query_statistics.h 
b/be/src/runtime/query_statistics.h
index bffea2b1d2a..0b0174172ff 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -82,10 +82,12 @@ public:
         current_used_memory_bytes = current_used_memory;
     }
 
-    void add_spill_bytes(int64_t spill_write_bytes_to_local_storage,
-                         int64_t spill_read_bytes_from_local_storage) {
-        _spill_write_bytes_to_local_storage += 
spill_write_bytes_to_local_storage;
-        _spill_read_bytes_from_local_storage += 
spill_read_bytes_from_local_storage;
+    void add_spill_write_bytes_to_local_storage(int64_t bytes) {
+        _spill_write_bytes_to_local_storage += bytes;
+    }
+
+    void add_spill_read_bytes_from_local_storage(int64_t bytes) {
+        _spill_read_bytes_from_local_storage += bytes;
     }
 
     void to_pb(PQueryStatistics* statistics);
diff --git a/be/src/vec/spill/spill_reader.cpp 
b/be/src/vec/spill/spill_reader.cpp
index 004de38f354..c947081fcaf 100644
--- a/be/src/vec/spill/spill_reader.cpp
+++ b/be/src/vec/spill/spill_reader.cpp
@@ -55,6 +55,9 @@ Status SpillReader::open() {
     RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t), result, 
&bytes_read));
     DCHECK(bytes_read == 8); // max_sub_block_size, block count
     COUNTER_UPDATE(_read_file_size, bytes_read);
+    if (_query_statistics) {
+        _query_statistics->add_spill_read_bytes_from_local_storage(bytes_read);
+    }
 
     // read max sub block size
     bytes_read = 0;
@@ -62,6 +65,9 @@ Status SpillReader::open() {
     RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t) * 2, 
result, &bytes_read));
     DCHECK(bytes_read == 8); // max_sub_block_size, block count
     COUNTER_UPDATE(_read_file_size, bytes_read);
+    if (_query_statistics) {
+        _query_statistics->add_spill_read_bytes_from_local_storage(bytes_read);
+    }
 
     size_t buff_size = std::max(block_count_ * sizeof(size_t), 
max_sub_block_size_);
     try {
@@ -80,6 +86,9 @@ Status SpillReader::open() {
     RETURN_IF_ERROR(file_reader_->read_at(read_offset, result, &bytes_read));
     DCHECK(bytes_read == block_count_ * sizeof(size_t));
     COUNTER_UPDATE(_read_file_size, bytes_read);
+    if (_query_statistics) {
+        _query_statistics->add_spill_read_bytes_from_local_storage(bytes_read);
+    }
 
     block_start_offsets_.resize(block_count_ + 1);
     for (size_t i = 0; i < block_count_; ++i) {
@@ -118,6 +127,9 @@ Status SpillReader::read(Block* block, bool* eos) {
 
     if (bytes_read > 0) {
         COUNTER_UPDATE(_read_file_size, bytes_read);
+        if (_query_statistics) {
+            
_query_statistics->add_spill_read_bytes_from_local_storage(bytes_read);
+        }
         COUNTER_UPDATE(_read_block_count, 1);
         {
             SCOPED_TIMER(_deserialize_timer);
diff --git a/be/src/vec/spill/spill_reader.h b/be/src/vec/spill/spill_reader.h
index 1096e529bf3..da975bdb605 100644
--- a/be/src/vec/spill/spill_reader.h
+++ b/be/src/vec/spill/spill_reader.h
@@ -25,6 +25,7 @@
 
 #include "common/status.h"
 #include "io/fs/file_reader_writer_fwd.h"
+#include "runtime/query_statistics.h"
 #include "util/runtime_profile.h"
 
 namespace doris::vectorized {
@@ -32,8 +33,11 @@ namespace doris::vectorized {
 class Block;
 class SpillReader {
 public:
-    SpillReader(int64_t stream_id, std::string file_path)
-            : stream_id_(stream_id), file_path_(std::move(file_path)) {}
+    SpillReader(std::shared_ptr<doris::QueryStatistics> query_statistics, 
int64_t stream_id,
+                std::string file_path)
+            : stream_id_(stream_id),
+              file_path_(std::move(file_path)),
+              _query_statistics(std::move(query_statistics)) {}
 
     ~SpillReader() { (void)close(); }
 
@@ -81,6 +85,8 @@ private:
     RuntimeProfile::Counter* _read_file_size = nullptr;
     RuntimeProfile::Counter* _read_rows_count = nullptr;
     RuntimeProfile::Counter* _read_file_count = nullptr;
+
+    std::shared_ptr<doris::QueryStatistics> _query_statistics = nullptr;
 };
 
 using SpillReaderUPtr = std::unique_ptr<SpillReader>;
diff --git a/be/src/vec/spill/spill_stream.cpp 
b/be/src/vec/spill/spill_stream.cpp
index f14e5f60974..3e5b93a21d7 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -25,6 +25,7 @@
 
 #include "io/fs/local_file_system.h"
 #include "runtime/exec_env.h"
+#include "runtime/query_context.h"
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
 #include "util/debug_points.h"
@@ -95,11 +96,14 @@ void SpillStream::gc() {
 }
 
 Status SpillStream::prepare() {
-    writer_ =
-            std::make_unique<SpillWriter>(profile_, stream_id_, batch_rows_, 
data_dir_, spill_dir_);
+    writer_ = std::make_unique<SpillWriter>(
+            
state_->get_query_ctx()->get_mem_tracker()->get_query_statistics(), profile_,
+            stream_id_, batch_rows_, data_dir_, spill_dir_);
     _set_write_counters(profile_);
 
-    reader_ = std::make_unique<SpillReader>(stream_id_, 
writer_->get_file_path());
+    reader_ = std::make_unique<SpillReader>(
+            
state_->get_query_ctx()->get_mem_tracker()->get_query_statistics(), stream_id_,
+            writer_->get_file_path());
 
     DBUG_EXECUTE_IF("fault_inject::spill_stream::prepare_spill", {
         return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream 
prepare_spill failed");
diff --git a/be/src/vec/spill/spill_writer.cpp 
b/be/src/vec/spill/spill_writer.cpp
index f6e16518043..0af0cb59a7d 100644
--- a/be/src/vec/spill/spill_writer.cpp
+++ b/be/src/vec/spill/spill_writer.cpp
@@ -52,6 +52,9 @@ Status SpillWriter::close() {
 
     total_written_bytes_ += meta_.size();
     COUNTER_UPDATE(_write_file_total_size, meta_.size());
+    if (_query_statistics) {
+        
_query_statistics->add_spill_write_bytes_to_local_storage(meta_.size());
+    }
     if (_write_file_current_size) {
         COUNTER_UPDATE(_write_file_current_size, meta_.size());
     }
@@ -149,6 +152,9 @@ Status SpillWriter::_write_internal(const Block& block, 
size_t& written_bytes) {
 
                     meta_.append((const char*)&total_written_bytes_, 
sizeof(size_t));
                     COUNTER_UPDATE(_write_file_total_size, buff_size);
+                    if (_query_statistics) {
+                        
_query_statistics->add_spill_write_bytes_to_local_storage(buff_size);
+                    }
                     if (_write_file_current_size) {
                         COUNTER_UPDATE(_write_file_current_size, buff_size);
                     }
diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h
index 05afebf5dcb..36a40e1e055 100644
--- a/be/src/vec/spill/spill_writer.h
+++ b/be/src/vec/spill/spill_writer.h
@@ -22,6 +22,7 @@
 #include <string>
 
 #include "io/fs/file_writer.h"
+#include "runtime/query_statistics.h"
 #include "util/runtime_profile.h"
 #include "vec/core/block.h"
 namespace doris {
@@ -32,9 +33,12 @@ namespace vectorized {
 class SpillDataDir;
 class SpillWriter {
 public:
-    SpillWriter(RuntimeProfile* profile, int64_t id, size_t batch_size, 
SpillDataDir* data_dir,
-                const std::string& dir)
-            : data_dir_(data_dir), stream_id_(id), batch_size_(batch_size) {
+    SpillWriter(std::shared_ptr<doris::QueryStatistics> query_statistics, 
RuntimeProfile* profile,
+                int64_t id, size_t batch_size, SpillDataDir* data_dir, const 
std::string& dir)
+            : data_dir_(data_dir),
+              stream_id_(id),
+              batch_size_(batch_size),
+              _query_statistics(std::move(query_statistics)) {
         // Directory path format specified in 
SpillStreamManager::register_spill_stream:
         // 
storage_root/spill/query_id/partitioned_hash_join-node_id-task_id-stream_id/0
         file_path_ = dir + "/0";
@@ -92,6 +96,8 @@ private:
     RuntimeProfile::Counter* _write_rows_counter = nullptr;
     RuntimeProfile::Counter* _memory_used_counter = nullptr;
     RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr;
+
+    std::shared_ptr<doris::QueryStatistics> _query_statistics = nullptr;
 };
 using SpillWriterUPtr = std::unique_ptr<SpillWriter>;
 } // namespace vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to