This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch 2.1-tmp
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 6c5dd820c0aebad9c75b7455d1dfb482eae8c488
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Tue Apr 2 22:31:46 2024 +0800

    [improvement](spill) improve spill timers (#33156)
---
 .../pipeline/exec/partitioned_aggregation_sink_operator.h  |  3 ++-
 .../exec/partitioned_aggregation_source_operator.cpp       |  6 +++---
 .../pipeline/exec/partitioned_hash_join_probe_operator.cpp | 14 ++++++++++----
 .../pipeline/exec/partitioned_hash_join_probe_operator.h   |  2 ++
 .../pipeline/exec/partitioned_hash_join_sink_operator.cpp  |  4 +++-
 be/src/pipeline/exec/partitioned_hash_join_sink_operator.h |  1 +
 be/src/pipeline/exec/spill_sort_sink_operator.cpp          |  6 +++---
 be/src/pipeline/exec/spill_sort_source_operator.cpp        |  5 +++--
 be/src/pipeline/pipeline_x/operator.h                      | 12 ++++++++++++
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp             |  4 ++--
 be/src/vec/spill/spill_stream.cpp                          |  6 +++++-
 be/src/vec/spill/spill_stream.h                            | 10 ++++++++--
 12 files changed, 54 insertions(+), 19 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 542046556ec..d63f272092b 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -131,7 +131,8 @@ public:
         RETURN_IF_ERROR(status);
         spill_stream->set_write_counters(Base::_spill_serialize_block_timer,
                                          Base::_spill_block_count, 
Base::_spill_data_size,
-                                         Base::_spill_write_disk_timer);
+                                         Base::_spill_write_disk_timer,
+                                         Base::_spill_write_wait_io_timer);
 
         status = to_block(context, keys, values, null_key_data);
         RETURN_IF_ERROR(status);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index c328598ac44..5db80788f41 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -227,9 +227,9 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
                                !_shared_state->spill_partitions.empty()) {
                             for (auto& stream :
                                  
_shared_state->spill_partitions[0]->spill_streams_) {
-                                
stream->set_read_counters(Base::_spill_read_data_time,
-                                                          
Base::_spill_deserialize_time,
-                                                          
Base::_spill_read_bytes);
+                                stream->set_read_counters(
+                                        Base::_spill_read_data_time, 
Base::_spill_deserialize_time,
+                                        Base::_spill_read_bytes, 
Base::_spill_read_wait_io_timer);
                                 vectorized::Block block;
                                 bool eos = false;
                                 while (!eos) {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 8f859820252..c23e12c3705 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -73,6 +73,10 @@ Status 
PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
                                                          "SpillAndPartition", 
1);
     _spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillReadDataSize",
                                                      TUnit::BYTES, 
"SpillAndPartition", 1);
+    _spill_write_wait_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteWaitIOTime",
+                                                            
"SpillAndPartition", 1);
+    _spill_read_wait_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillReadWaitIOTime",
+                                                           
"SpillAndPartition", 1);
 
     // Build phase
     _build_phase_label = ADD_LABEL_COUNTER(profile(), "BuildPhase");
@@ -175,7 +179,8 @@ Status 
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
                 std::numeric_limits<size_t>::max(), _runtime_profile.get()));
         RETURN_IF_ERROR(build_spilling_stream->prepare_spill());
         
build_spilling_stream->set_write_counters(_spill_serialize_block_timer, 
_spill_block_count,
-                                                  _spill_data_size, 
_spill_write_disk_timer);
+                                                  _spill_data_size, 
_spill_write_disk_timer,
+                                                  _spill_write_wait_io_timer);
     }
 
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(
@@ -225,7 +230,8 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
                 _runtime_profile.get()));
         RETURN_IF_ERROR(spilling_stream->prepare_spill());
         spilling_stream->set_write_counters(_spill_serialize_block_timer, 
_spill_block_count,
-                                            _spill_data_size, 
_spill_write_disk_timer);
+                                            _spill_data_size, 
_spill_write_disk_timer,
+                                            _spill_write_wait_io_timer);
     }
 
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(
@@ -294,7 +300,7 @@ Status 
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
         build_spilling_stream->end_spill(Status::OK());
         RETURN_IF_ERROR(build_spilling_stream->spill_eof());
         build_spilling_stream->set_read_counters(_spill_read_data_time, 
_spill_deserialize_time,
-                                                 _spill_read_bytes);
+                                                 _spill_read_bytes, 
_spill_read_wait_io_timer);
     }
 
     auto& probe_spilling_stream = _probe_spilling_streams[partition_index];
@@ -303,7 +309,7 @@ Status 
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
         probe_spilling_stream->end_spill(Status::OK());
         RETURN_IF_ERROR(probe_spilling_stream->spill_eof());
         probe_spilling_stream->set_read_counters(_spill_read_data_time, 
_spill_deserialize_time,
-                                                 _spill_read_bytes);
+                                                 _spill_read_bytes, 
_spill_read_wait_io_timer);
     }
 
     return Status::OK();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 7337017fde6..8270817758d 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -112,6 +112,8 @@ private:
     RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
     RuntimeProfile::Counter* _spill_data_size = nullptr;
     RuntimeProfile::Counter* _spill_block_count = nullptr;
+    RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
+    RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
 
     RuntimeProfile::Counter* _build_phase_label = nullptr;
     RuntimeProfile::Counter* _build_rows_counter = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index dd119ade14b..f38354d5de2 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -40,6 +40,7 @@ Status 
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
     _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(profile(), 
"SpillWriteDiskTime", 1);
     _spill_data_size = ADD_COUNTER_WITH_LEVEL(profile(), "SpillWriteDataSize", 
TUnit::BYTES, 1);
     _spill_block_count = ADD_COUNTER_WITH_LEVEL(profile(), 
"SpillWriteBlockCount", TUnit::UNIT, 1);
+    _spill_write_wait_io_timer = ADD_TIMER_WITH_LEVEL(profile(), 
"SpillWriteWaitIOTime", 1);
 
     return _partitioner->prepare(state, p._child_x->row_desc());
 }
@@ -80,7 +81,8 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
                     std::numeric_limits<size_t>::max(), _profile));
             RETURN_IF_ERROR(spilling_stream->prepare_spill());
             spilling_stream->set_write_counters(_spill_serialize_block_timer, 
_spill_block_count,
-                                                _spill_data_size, 
_spill_write_disk_timer);
+                                                _spill_data_size, 
_spill_write_disk_timer,
+                                                _spill_write_wait_io_timer);
         }
 
         auto* spill_io_pool =
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 4d25acd1b20..e364e225f66 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -80,6 +80,7 @@ protected:
     RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
     RuntimeProfile::Counter* _spill_data_size = nullptr;
     RuntimeProfile::Counter* _spill_block_count = nullptr;
+    RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
 };
 
 class PartitionedHashJoinSinkOperatorX
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 9c3ae278ff6..662e195f3e5 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -210,9 +210,9 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* 
state) {
             SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, profile());
     RETURN_IF_ERROR(status);
 
-    _spilling_stream->set_write_counters(Base::_spill_serialize_block_timer,
-                                         Base::_spill_block_count, 
Base::_spill_data_size,
-                                         Base::_spill_write_disk_timer);
+    _spilling_stream->set_write_counters(
+            Base::_spill_serialize_block_timer, Base::_spill_block_count, 
Base::_spill_data_size,
+            Base::_spill_write_disk_timer, Base::_spill_write_wait_io_timer);
 
     status = _spilling_stream->prepare_spill();
     RETURN_IF_ERROR(status);
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 6ae30a482f7..c021687e1df 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -134,7 +134,8 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
 
                 bool eos = false;
                 tmp_stream->set_write_counters(_spill_serialize_block_timer, 
_spill_block_count,
-                                               _spill_data_size, 
_spill_write_disk_timer);
+                                               _spill_data_size, 
_spill_write_disk_timer,
+                                               _spill_write_wait_io_timer);
                 while (!eos && !state->is_cancelled()) {
                     merge_sorted_block.clear_column_data();
                     {
@@ -170,7 +171,7 @@ Status SpillSortLocalState::_create_intermediate_merger(
     for (int i = 0; i < num_blocks && !_shared_state->sorted_streams.empty(); 
++i) {
         auto stream = _shared_state->sorted_streams.front();
         stream->set_read_counters(Base::_spill_read_data_time, 
Base::_spill_deserialize_time,
-                                  Base::_spill_read_bytes);
+                                  Base::_spill_read_bytes, 
Base::_spill_read_wait_io_timer);
         _current_merging_streams.emplace_back(stream);
         child_block_suppliers.emplace_back(
                 
std::bind(std::mem_fn(&vectorized::SpillStream::read_next_block_sync), 
stream.get(),
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 4ca8022d163..20fa46a5bf9 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -455,6 +455,10 @@ public:
                 ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillDeserializeTime", "Spill", 1);
         _spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillReadDataSize",
                                                          TUnit::BYTES, 
"Spill", 1);
+        _spill_write_wait_io_timer =
+                ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteWaitIOTime", "Spill", 1);
+        _spill_read_wait_io_timer =
+                ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillReadWaitIOTime", "Spill", 1);
         return Status::OK();
     }
 
@@ -463,6 +467,8 @@ public:
     RuntimeProfile::Counter* _spill_read_data_time;
     RuntimeProfile::Counter* _spill_deserialize_time;
     RuntimeProfile::Counter* _spill_read_bytes;
+    RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
+    RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
 };
 
 class DataSinkOperatorXBase;
@@ -770,6 +776,10 @@ public:
                                                         TUnit::BYTES, "Spill", 
1);
         _spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillWriteBlockCount",
                                                           TUnit::UNIT, 
"Spill", 1);
+        _spill_write_wait_io_timer =
+                ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteWaitIOTime", "Spill", 1);
+        _spill_read_wait_io_timer =
+                ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillReadWaitIOTime", "Spill", 1);
         return Status::OK();
     }
 
@@ -779,6 +789,8 @@ public:
     RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
     RuntimeProfile::Counter* _spill_data_size = nullptr;
     RuntimeProfile::Counter* _spill_block_count = nullptr;
+    RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
+    RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
 };
 
 /**
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 9222c482381..4b85df05484 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -306,11 +306,12 @@ bool PipelineXTask::should_revoke_memory(RuntimeState* 
state, int64_t revocable_
         LOG_ONCE(INFO) << "no workload group for query " << 
print_id(state->query_id());
         return false;
     }
+    const auto min_revocable_mem_bytes = state->min_revocable_mem();
     bool is_wg_mem_low_water_mark = false;
     bool is_wg_mem_high_water_mark = false;
     wg->check_mem_used(&is_wg_mem_low_water_mark, &is_wg_mem_high_water_mark);
     if (is_wg_mem_high_water_mark) {
-        if (revocable_mem_bytes > 0) {
+        if (revocable_mem_bytes > min_revocable_mem_bytes) {
             LOG_EVERY_N(INFO, 5) << "revoke memory, hight water mark";
             return true;
         }
@@ -331,7 +332,6 @@ bool PipelineXTask::should_revoke_memory(RuntimeState* 
state, int64_t revocable_
             mem_limit_of_op = query_weighted_limit / big_memory_operator_num;
         }
 
-        const auto min_revocable_mem_bytes = state->min_revocable_mem();
         LOG_EVERY_N(INFO, 5) << "revoke memory, low water mark, 
revocable_mem_bytes: "
                              << PrettyPrinter::print_bytes(revocable_mem_bytes)
                              << ", mem_limit_of_op: " << 
PrettyPrinter::print_bytes(mem_limit_of_op)
diff --git a/be/src/vec/spill/spill_stream.cpp 
b/be/src/vec/spill/spill_stream.cpp
index f245f8fa309..843a9fc9658 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -95,6 +95,7 @@ void SpillStream::end_spill(const Status& status) {
 
 Status SpillStream::wait_spill() {
     if (spill_promise_) {
+        SCOPED_TIMER(write_wait_io_timer_);
         auto status = spill_future_.get();
         spill_promise_.reset();
         return status;
@@ -141,7 +142,10 @@ Status SpillStream::read_next_block_sync(Block* block, 
bool* eos) {
         return status;
     }
 
-    status = read_future_.get();
+    {
+        SCOPED_TIMER(read_wait_io_timer_);
+        status = read_future_.get();
+    }
     read_promise_.reset();
     return status;
 }
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index 4d53b439712..afec4734d8a 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -64,15 +64,19 @@ public:
     void set_write_counters(RuntimeProfile::Counter* serialize_timer,
                             RuntimeProfile::Counter* write_block_counter,
                             RuntimeProfile::Counter* write_bytes_counter,
-                            RuntimeProfile::Counter* write_timer) {
+                            RuntimeProfile::Counter* write_timer,
+                            RuntimeProfile::Counter* wait_io_timer) {
         writer_->set_counters(serialize_timer, write_block_counter, 
write_bytes_counter,
                               write_timer);
+        write_wait_io_timer_ = wait_io_timer;
     }
 
     void set_read_counters(RuntimeProfile::Counter* read_timer,
                            RuntimeProfile::Counter* deserialize_timer,
-                           RuntimeProfile::Counter* read_bytes) {
+                           RuntimeProfile::Counter* read_bytes,
+                           RuntimeProfile::Counter* wait_io_timer) {
         reader_->set_counters(read_timer, deserialize_timer, read_bytes);
+        read_wait_io_timer_ = wait_io_timer;
     }
 
 private:
@@ -100,6 +104,8 @@ private:
     SpillReaderUPtr reader_;
 
     RuntimeProfile* profile_ = nullptr;
+    RuntimeProfile::Counter* write_wait_io_timer_ = nullptr;
+    RuntimeProfile::Counter* read_wait_io_timer_ = nullptr;
 };
 using SpillStreamSPtr = std::shared_ptr<SpillStream>;
 } // namespace vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to