This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch 2.1-tmp
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2e40e395844cf716a91f09c4be9bcf7b583aefbd
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Wed Apr 3 15:19:04 2024 +0800

    [chore](spill) add timers for performance tuning (#33185)
---
 .../exec/partitioned_aggregation_sink_operator.cpp |   5 +-
 .../partitioned_aggregation_source_operator.cpp    |   7 +-
 .../exec/partitioned_hash_join_probe_operator.cpp  | 100 ++++++++++++---------
 .../exec/partitioned_hash_join_probe_operator.h    |  24 +++--
 .../exec/partitioned_hash_join_sink_operator.cpp   |  53 +++++------
 .../exec/partitioned_hash_join_sink_operator.h     |  12 +--
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  10 ++-
 .../pipeline/exec/spill_sort_source_operator.cpp   |   9 +-
 be/src/pipeline/pipeline_x/operator.h              |   6 ++
 9 files changed, 137 insertions(+), 89 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 64996724e15..3dea330c117 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -252,13 +252,16 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
     auto execution_context = state->get_task_execution_context();
     _shared_state_holder = _shared_state->shared_from_this();
 
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
     status = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
-            [this, &parent, state, execution_context] {
+            [this, &parent, state, execution_context, submit_timer] {
                 auto execution_context_lock = execution_context.lock();
                 if (!execution_context_lock) {
                     LOG(INFO) << "execution_context released, maybe query was 
cancelled.";
                     return Status::Cancelled("Cancelled");
                 }
+                
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
                 SCOPED_ATTACH_TASK(state);
                 SCOPED_TIMER(Base::_spill_timer);
                 Defer defer {[&]() {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 5db80788f41..5680b75c87e 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -197,9 +197,13 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
 
     auto execution_context = state->get_task_execution_context();
     _shared_state_holder = _shared_state->shared_from_this();
+
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+
     RETURN_IF_ERROR(
             
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
-                    [this, state, execution_context] {
+                    [this, state, execution_context, submit_timer] {
                         auto execution_context_lock = execution_context.lock();
                         if (!execution_context_lock) {
                             LOG(INFO) << "execution_context released, maybe 
query was cancelled.";
@@ -207,6 +211,7 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
                             return Status::Cancelled("Cancelled");
                         }
 
+                        
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
                         SCOPED_ATTACH_TASK(state);
                         Defer defer {[&]() {
                             if (!_status.ok()) {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index c23e12c3705..1a05b78b052 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -25,10 +25,11 @@ namespace doris::pipeline {
 
 
PartitionedHashJoinProbeLocalState::PartitionedHashJoinProbeLocalState(RuntimeState*
 state,
                                                                        
OperatorXBase* parent)
-        : JoinProbeLocalState(state, parent) {}
+        : PipelineXSpillLocalState(state, parent),
+          _child_block(vectorized::Block::create_unique()) {}
 
 Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
-    RETURN_IF_ERROR(JoinProbeLocalState::init(state, info));
+    RETURN_IF_ERROR(PipelineXSpillLocalState::init(state, info));
     _internal_runtime_profile.reset(new RuntimeProfile("internal_profile"));
     auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
 
@@ -38,45 +39,32 @@ Status 
PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
     RETURN_IF_ERROR(_partitioner->init(p._probe_exprs));
     RETURN_IF_ERROR(_partitioner->prepare(state, p._child_x->row_desc()));
 
-    _spill_and_partition_label = ADD_LABEL_COUNTER(profile(), 
"SpillAndPartition");
-    _partition_timer = ADD_CHILD_TIMER(profile(), "PartitionTime", 
"SpillAndPartition");
-    _partition_shuffle_timer =
-            ADD_CHILD_TIMER(profile(), "PartitionShuffleTime", 
"SpillAndPartition");
-    _spill_build_rows =
-            ADD_CHILD_COUNTER(profile(), "SpillBuildRows", TUnit::UNIT, 
"SpillAndPartition");
-    _recovery_build_rows =
-            ADD_CHILD_COUNTER(profile(), "RecoveryBuildRows", TUnit::UNIT, 
"SpillAndPartition");
-    _spill_probe_rows =
-            ADD_CHILD_COUNTER(profile(), "SpillProbeRows", TUnit::UNIT, 
"SpillAndPartition");
-    _recovery_probe_rows =
-            ADD_CHILD_COUNTER(profile(), "RecoveryProbeRows", TUnit::UNIT, 
"SpillAndPartition");
-    _spill_build_blocks =
-            ADD_CHILD_COUNTER(profile(), "SpillBuildBlocks", TUnit::UNIT, 
"SpillAndPartition");
+    _spill_and_partition_label = ADD_LABEL_COUNTER(profile(), "Partition");
+    _partition_timer = ADD_CHILD_TIMER(profile(), "PartitionTime", 
"Partition");
+    _partition_shuffle_timer = ADD_CHILD_TIMER(profile(), 
"PartitionShuffleTime", "Partition");
+    _spill_build_rows = ADD_CHILD_COUNTER(profile(), "SpillBuildRows", 
TUnit::UNIT, "Spill");
+    _spill_build_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), 
"SpillBuildTime", "Spill", 1);
+    _recovery_build_rows = ADD_CHILD_COUNTER(profile(), "RecoveryBuildRows", 
TUnit::UNIT, "Spill");
+    _recovery_build_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), 
"RecoveryBuildTime", "Spill", 1);
+    _spill_probe_rows = ADD_CHILD_COUNTER(profile(), "SpillProbeRows", 
TUnit::UNIT, "Spill");
+    _recovery_probe_rows = ADD_CHILD_COUNTER(profile(), "RecoveryProbeRows", 
TUnit::UNIT, "Spill");
+    _spill_build_blocks = ADD_CHILD_COUNTER(profile(), "SpillBuildBlocks", 
TUnit::UNIT, "Spill");
     _recovery_build_blocks =
-            ADD_CHILD_COUNTER(profile(), "RecoveryBuildBlocks", TUnit::UNIT, 
"SpillAndPartition");
-    _spill_probe_blocks =
-            ADD_CHILD_COUNTER(profile(), "SpillProbeBlocks", TUnit::UNIT, 
"SpillAndPartition");
+            ADD_CHILD_COUNTER(profile(), "RecoveryBuildBlocks", TUnit::UNIT, 
"Spill");
+    _spill_probe_blocks = ADD_CHILD_COUNTER(profile(), "SpillProbeBlocks", 
TUnit::UNIT, "Spill");
+    _spill_probe_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), 
"SpillProbeTime", "Spill", 1);
     _recovery_probe_blocks =
-            ADD_CHILD_COUNTER(profile(), "RecoveryProbeBlocks", TUnit::UNIT, 
"SpillAndPartition");
+            ADD_CHILD_COUNTER(profile(), "RecoveryProbeBlocks", TUnit::UNIT, 
"Spill");
+    _recovery_probe_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), 
"RecoveryProbeTime", "Spill", 1);
 
-    _spill_serialize_block_timer = ADD_CHILD_TIMER_WITH_LEVEL(
-            Base::profile(), "SpillSerializeBlockTime", "SpillAndPartition", 
1);
-    _spill_write_disk_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteDiskTime",
-                                                         "SpillAndPartition", 
1);
+    _spill_serialize_block_timer =
+            ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillSerializeBlockTime", "Spill", 1);
+    _spill_write_disk_timer =
+            ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime", 
"Spill", 1);
     _spill_data_size = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillWriteDataSize",
-                                                    TUnit::BYTES, 
"SpillAndPartition", 1);
+                                                    TUnit::BYTES, "Spill", 1);
     _spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillWriteBlockCount",
-                                                      TUnit::UNIT, 
"SpillAndPartition", 1);
-    _spill_read_data_time = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillReadDataTime",
-                                                       "SpillAndPartition", 1);
-    _spill_deserialize_time = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillDeserializeTime",
-                                                         "SpillAndPartition", 
1);
-    _spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillReadDataSize",
-                                                     TUnit::BYTES, 
"SpillAndPartition", 1);
-    _spill_write_wait_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteWaitIOTime",
-                                                            
"SpillAndPartition", 1);
-    _spill_read_wait_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillReadWaitIOTime",
-                                                           
"SpillAndPartition", 1);
+                                                      TUnit::UNIT, "Spill", 1);
 
     // Build phase
     _build_phase_label = ADD_LABEL_COUNTER(profile(), "BuildPhase");
@@ -109,6 +97,10 @@ Status 
PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
     _process_other_join_conjunct_timer =
             ADD_CHILD_TIMER(profile(), "OtherJoinConjunctTime", "ProbePhase");
     _init_probe_side_timer = ADD_CHILD_TIMER(profile(), "InitProbeSideTime", 
"ProbePhase");
+    _probe_timer = ADD_CHILD_TIMER(profile(), "ProbeTime", "ProbePhase");
+    _join_filter_timer = ADD_CHILD_TIMER(profile(), "JoinFilterTimer", 
"ProbePhase");
+    _build_output_block_timer = ADD_CHILD_TIMER(profile(), "BuildOutputBlock", 
"ProbePhase");
+    _probe_rows_counter = ADD_CHILD_COUNTER(profile(), "ProbeRows", 
TUnit::UNIT, "ProbePhase");
     return Status::OK();
 }
 #define UPDATE_PROFILE(counter, name)                           \
@@ -149,7 +141,7 @@ void 
PartitionedHashJoinProbeLocalState::update_probe_profile(RuntimeProfile* ch
 #undef UPDATE_PROFILE
 
 Status PartitionedHashJoinProbeLocalState::open(RuntimeState* state) {
-    RETURN_IF_ERROR(PipelineXLocalStateBase::open(state));
+    RETURN_IF_ERROR(PipelineXSpillLocalState::open(state));
     return _partitioner->open(state);
 }
 Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
@@ -157,7 +149,7 @@ Status 
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
         return Status::OK();
     }
     dec_running_big_mem_op_num(state);
-    RETURN_IF_ERROR(JoinProbeLocalState::close(state));
+    RETURN_IF_ERROR(PipelineXSpillLocalState::close(state));
     return Status::OK();
 }
 
@@ -187,13 +179,17 @@ Status 
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
             build_spilling_stream->get_spill_root_dir());
     auto execution_context = state->get_task_execution_context();
     _shared_state_holder = _shared_state->shared_from_this();
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
     return spill_io_pool->submit_func(
-            [execution_context, state, &build_spilling_stream, &mutable_block, 
this] {
+            [execution_context, state, &build_spilling_stream, &mutable_block, 
submit_timer, this] {
                 auto execution_context_lock = execution_context.lock();
                 if (!execution_context_lock) {
                     LOG(INFO) << "execution_context released, maybe query was 
cancelled.";
                     return;
                 }
+                
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+                SCOPED_TIMER(_spill_build_timer);
                 (void)state; // avoid ut compile error
                 SCOPED_ATTACH_TASK(state);
                 if (_spill_status_ok) {
@@ -248,14 +244,18 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
     if (!blocks.empty()) {
         auto execution_context = state->get_task_execution_context();
         _shared_state_holder = _shared_state->shared_from_this();
+        MonotonicStopWatch submit_timer;
+        submit_timer.start();
         return spill_io_pool->submit_func(
-                [execution_context, state, &blocks, spilling_stream, this] {
+                [execution_context, state, &blocks, spilling_stream, 
submit_timer, this] {
                     auto execution_context_lock = execution_context.lock();
                     if (!execution_context_lock) {
                         LOG(INFO) << "execution_context released, maybe query 
was cancelled.";
                         _dependency->set_ready();
                         return;
                     }
+                    
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+                    SCOPED_TIMER(_spill_probe_timer);
                     SCOPED_ATTACH_TASK(state);
                     COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
                     while (!blocks.empty() && !state->is_cancelled()) {
@@ -329,12 +329,19 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
 
     auto execution_context = state->get_task_execution_context();
     _shared_state_holder = _shared_state->shared_from_this();
-    auto read_func = [this, state, &spilled_stream, &mutable_block, 
execution_context] {
+
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+
+    auto read_func = [this, state, &spilled_stream, &mutable_block, 
execution_context,
+                      submit_timer] {
         auto execution_context_lock = execution_context.lock();
         if (!execution_context_lock) {
             LOG(INFO) << "execution_context released, maybe query was 
cancelled.";
             return;
         }
+        _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+        SCOPED_TIMER(_recovery_build_timer);
         Defer defer([this] { --_spilling_task_count; });
         (void)state; // avoid ut compile error
         SCOPED_ATTACH_TASK(state);
@@ -403,12 +410,19 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
     /// TODO: maybe recovery more blocks each time.
     auto execution_context = state->get_task_execution_context();
     _shared_state_holder = _shared_state->shared_from_this();
-    auto read_func = [this, execution_context, state, &spilled_stream, 
&blocks] {
+
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+
+    auto read_func = [this, execution_context, state, &spilled_stream, 
&blocks, submit_timer] {
         auto execution_context_lock = execution_context.lock();
         if (!execution_context_lock) {
             LOG(INFO) << "execution_context released, maybe query was 
cancelled.";
             return;
         }
+
+        _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+        SCOPED_TIMER(_recovery_probe_timer);
         Defer defer([this] { --_spilling_task_count; });
         (void)state; // avoid ut compile error
         SCOPED_ATTACH_TASK(state);
@@ -827,4 +841,4 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
     return Status::OK();
 }
 
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 8270817758d..143576e1b86 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -38,16 +38,13 @@ using PartitionerType = 
vectorized::XXHashPartitioner<LocalExchangeChannelIds>;
 class PartitionedHashJoinProbeOperatorX;
 
 class PartitionedHashJoinProbeLocalState final
-        : public JoinProbeLocalState<PartitionedHashJoinSharedState,
-                                     PartitionedHashJoinProbeLocalState> {
+        : public PipelineXSpillLocalState<PartitionedHashJoinSharedState> {
 public:
     using Parent = PartitionedHashJoinProbeOperatorX;
     ENABLE_FACTORY_CREATOR(PartitionedHashJoinProbeLocalState);
     PartitionedHashJoinProbeLocalState(RuntimeState* state, OperatorXBase* 
parent);
     ~PartitionedHashJoinProbeLocalState() override = default;
 
-    void add_tuple_is_null_column(vectorized::Block* block) override {}
-
     Status init(RuntimeState* state, LocalStateInfo& info) override;
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
@@ -68,9 +65,15 @@ public:
     friend class PartitionedHashJoinProbeOperatorX;
 
 private:
+    template <typename LocalStateType>
+    friend class StatefulOperatorX;
+
     std::shared_ptr<BasicSharedState> _in_mem_shared_state_sptr;
     uint32_t _partition_cursor {0};
 
+    std::unique_ptr<vectorized::Block> _child_block;
+    bool _child_eos {false};
+
     std::mutex _spill_lock;
     Status _spill_status;
 
@@ -98,22 +101,21 @@ private:
     RuntimeProfile::Counter* _partition_shuffle_timer = nullptr;
     RuntimeProfile::Counter* _spill_build_rows = nullptr;
     RuntimeProfile::Counter* _spill_build_blocks = nullptr;
+    RuntimeProfile::Counter* _spill_build_timer = nullptr;
     RuntimeProfile::Counter* _recovery_build_rows = nullptr;
     RuntimeProfile::Counter* _recovery_build_blocks = nullptr;
+    RuntimeProfile::Counter* _recovery_build_timer = nullptr;
     RuntimeProfile::Counter* _spill_probe_rows = nullptr;
     RuntimeProfile::Counter* _spill_probe_blocks = nullptr;
+    RuntimeProfile::Counter* _spill_probe_timer = nullptr;
     RuntimeProfile::Counter* _recovery_probe_rows = nullptr;
     RuntimeProfile::Counter* _recovery_probe_blocks = nullptr;
+    RuntimeProfile::Counter* _recovery_probe_timer = nullptr;
 
-    RuntimeProfile::Counter* _spill_read_data_time = nullptr;
-    RuntimeProfile::Counter* _spill_deserialize_time = nullptr;
-    RuntimeProfile::Counter* _spill_read_bytes = nullptr;
     RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
     RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
     RuntimeProfile::Counter* _spill_data_size = nullptr;
     RuntimeProfile::Counter* _spill_block_count = nullptr;
-    RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
-    RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
 
     RuntimeProfile::Counter* _build_phase_label = nullptr;
     RuntimeProfile::Counter* _build_rows_counter = nullptr;
@@ -137,6 +139,10 @@ private:
     RuntimeProfile::Counter* _init_probe_side_timer = nullptr;
     RuntimeProfile::Counter* _build_side_output_timer = nullptr;
     RuntimeProfile::Counter* _process_other_join_conjunct_timer = nullptr;
+    RuntimeProfile::Counter* _probe_timer = nullptr;
+    RuntimeProfile::Counter* _probe_rows_counter = nullptr;
+    RuntimeProfile::Counter* _join_filter_timer = nullptr;
+    RuntimeProfile::Counter* _build_output_block_timer = nullptr;
 };
 
 class PartitionedHashJoinProbeOperatorX final
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index f38354d5de2..8b9accd30ad 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -25,7 +25,7 @@ namespace doris::pipeline {
 
 Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
                                                
doris::pipeline::LocalSinkStateInfo& info) {
-    RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info));
+    RETURN_IF_ERROR(PipelineXSpillSinkLocalState::init(state, info));
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
     _shared_state->partitioned_build_blocks.resize(p._partition_count);
     _shared_state->spilled_streams.resize(p._partition_count);
@@ -33,30 +33,26 @@ Status 
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
     _partitioner = std::make_unique<PartitionerType>(p._partition_count);
     RETURN_IF_ERROR(_partitioner->init(p._build_exprs));
 
-    _partition_timer = ADD_TIMER(profile(), "PartitionTime");
-    _partition_shuffle_timer = ADD_TIMER(profile(), "PartitionShuffleTime");
-
-    _spill_serialize_block_timer = ADD_TIMER_WITH_LEVEL(profile(), 
"SpillSerializeBlockTime", 1);
-    _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(profile(), 
"SpillWriteDiskTime", 1);
-    _spill_data_size = ADD_COUNTER_WITH_LEVEL(profile(), "SpillWriteDataSize", 
TUnit::BYTES, 1);
-    _spill_block_count = ADD_COUNTER_WITH_LEVEL(profile(), 
"SpillWriteBlockCount", TUnit::UNIT, 1);
-    _spill_write_wait_io_timer = ADD_TIMER_WITH_LEVEL(profile(), 
"SpillWriteWaitIOTime", 1);
+    _partition_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "PartitionTime", 
"Spill", 1);
+    _partition_shuffle_timer =
+            ADD_CHILD_TIMER_WITH_LEVEL(profile(), "PartitionShuffleTime", 
"Spill", 1);
+    _spill_build_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), 
"SpillBuildTime", "Spill", 1);
 
     return _partitioner->prepare(state, p._child_x->row_desc());
 }
 
 Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) {
-    RETURN_IF_ERROR(PipelineXSinkLocalState::open(state));
+    RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state));
     return _partitioner->open(state);
 }
 Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status 
exec_status) {
-    SCOPED_TIMER(PipelineXSinkLocalState::exec_time_counter());
-    SCOPED_TIMER(PipelineXSinkLocalState::_close_timer);
-    if (PipelineXSinkLocalState::_closed) {
+    SCOPED_TIMER(PipelineXSpillSinkLocalState::exec_time_counter());
+    SCOPED_TIMER(PipelineXSpillSinkLocalState::_close_timer);
+    if (PipelineXSpillSinkLocalState::_closed) {
         return Status::OK();
     }
     dec_running_big_mem_op_num(state);
-    return PipelineXSinkLocalState::close(state, exec_status);
+    return PipelineXSpillSinkLocalState::close(state, exec_status);
 }
 
 Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
@@ -90,16 +86,23 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
         DCHECK(spill_io_pool != nullptr);
         auto execution_context = state->get_task_execution_context();
         _shared_state_holder = _shared_state->shared_from_this();
-        auto st = spill_io_pool->submit_func([this, execution_context, state, 
spilling_stream, i] {
-            auto execution_context_lock = execution_context.lock();
-            if (!execution_context_lock) {
-                LOG(INFO) << "execution_context released, maybe query was 
cancelled.";
-                return;
-            }
-            (void)state; // avoid ut compile error
-            SCOPED_ATTACH_TASK(state);
-            _spill_to_disk(i, spilling_stream);
-        });
+
+        MonotonicStopWatch submit_timer;
+        submit_timer.start();
+
+        auto st = spill_io_pool->submit_func(
+                [this, execution_context, state, spilling_stream, i, 
submit_timer] {
+                    auto execution_context_lock = execution_context.lock();
+                    if (!execution_context_lock) {
+                        LOG(INFO) << "execution_context released, maybe query 
was cancelled.";
+                        return;
+                    }
+                    
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+                    SCOPED_TIMER(_spill_build_timer);
+                    (void)state; // avoid ut compile error
+                    SCOPED_ATTACH_TASK(state);
+                    _spill_to_disk(i, spilling_stream);
+                });
 
         if (!st.ok()) {
             --_spilling_streams_count;
@@ -274,4 +277,4 @@ Status 
PartitionedHashJoinSinkOperatorX::revoke_memory(RuntimeState* state) {
     return local_state.revoke_memory(state);
 }
 
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index e364e225f66..96e751360d4 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -39,7 +39,7 @@ using PartitionerType = 
vectorized::XXHashPartitioner<LocalExchangeChannelIds>;
 class PartitionedHashJoinSinkOperatorX;
 
 class PartitionedHashJoinSinkLocalState
-        : public PipelineXSinkLocalState<PartitionedHashJoinSharedState> {
+        : public PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState> {
 public:
     using Parent = PartitionedHashJoinSinkOperatorX;
     ENABLE_FACTORY_CREATOR(PartitionedHashJoinSinkLocalState);
@@ -51,7 +51,7 @@ public:
 
 protected:
     PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent, 
RuntimeState* state)
-            : PipelineXSinkLocalState<PartitionedHashJoinSharedState>(parent, 
state) {}
+            : 
PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState>(parent, state) {}
 
     void _spill_to_disk(uint32_t partition_index,
                         const vectorized::SpillStreamSPtr& spilling_stream);
@@ -76,11 +76,7 @@ protected:
 
     RuntimeProfile::Counter* _partition_timer = nullptr;
     RuntimeProfile::Counter* _partition_shuffle_timer = nullptr;
-    RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
-    RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
-    RuntimeProfile::Counter* _spill_data_size = nullptr;
-    RuntimeProfile::Counter* _spill_block_count = nullptr;
-    RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
+    RuntimeProfile::Counter* _spill_build_timer = nullptr;
 };
 
 class PartitionedHashJoinSinkOperatorX
@@ -139,4 +135,4 @@ private:
 };
 
 } // namespace pipeline
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 662e195f3e5..523ff2cfaaf 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -55,6 +55,9 @@ void SpillSortSinkLocalState::_init_counters() {
 
     _spill_merge_sort_timer =
             ADD_CHILD_TIMER_WITH_LEVEL(_profile, "SpillMergeSortTime", 
"Spill", 1);
+
+    _spill_wait_in_queue_timer =
+            ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillWaitInQueueTime", 
"Spill", 1);
 }
 #define UPDATE_PROFILE(counter, name)                           \
     do {                                                        \
@@ -227,17 +230,22 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
 
     auto execution_context = state->get_task_execution_context();
     _shared_state_holder = _shared_state->shared_from_this();
+
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+
     status =
             ExecEnv::GetInstance()
                     ->spill_stream_mgr()
                     
->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir())
-                    ->submit_func([this, state, &parent, execution_context] {
+                    ->submit_func([this, state, &parent, execution_context, 
submit_timer] {
                         auto execution_context_lock = execution_context.lock();
                         if (!execution_context_lock) {
                             LOG(INFO) << "execution_context released, maybe 
query was cancelled.";
                             return Status::OK();
                         }
 
+                        
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
                         SCOPED_ATTACH_TASK(state);
                         Defer defer {[&]() {
                             if (!_shared_state->sink_status.ok()) {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index c021687e1df..c53c057088c 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -43,6 +43,8 @@ Status SpillSortLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
                                                     TUnit::BYTES, "Spill", 1);
     _spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillWriteBlockCount",
                                                       TUnit::UNIT, "Spill", 1);
+    _spill_wait_in_queue_timer =
+            ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillWaitInQueueTime", 
"Spill", 1);
     return Status::OK();
 }
 
@@ -82,13 +84,18 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
 
     auto execution_context = state->get_task_execution_context();
     _shared_state_holder = _shared_state->shared_from_this();
-    auto spill_func = [this, state, &parent, execution_context] {
+
+    MonotonicStopWatch submit_timer;
+    submit_timer.start();
+
+    auto spill_func = [this, state, &parent, execution_context, submit_timer] {
         auto execution_context_lock = execution_context.lock();
         if (!execution_context_lock) {
             LOG(INFO) << "execution_context released, maybe query was 
cancelled.";
             return Status::OK();
         }
 
+        _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
         SCOPED_TIMER(_spill_merge_sort_timer);
         SCOPED_ATTACH_TASK(state);
         Defer defer {[&]() {
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 20fa46a5bf9..45e42390bc5 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -455,6 +455,8 @@ public:
                 ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillDeserializeTime", "Spill", 1);
         _spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillReadDataSize",
                                                          TUnit::BYTES, 
"Spill", 1);
+        _spill_wait_in_queue_timer =
+                ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWaitInQueueTime", "Spill", 1);
         _spill_write_wait_io_timer =
                 ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteWaitIOTime", "Spill", 1);
         _spill_read_wait_io_timer =
@@ -469,6 +471,7 @@ public:
     RuntimeProfile::Counter* _spill_read_bytes;
     RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
     RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
+    RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr;
 };
 
 class DataSinkOperatorXBase;
@@ -776,6 +779,8 @@ public:
                                                         TUnit::BYTES, "Spill", 
1);
         _spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), 
"SpillWriteBlockCount",
                                                           TUnit::UNIT, 
"Spill", 1);
+        _spill_wait_in_queue_timer =
+                ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWaitInQueueTime", "Spill", 1);
         _spill_write_wait_io_timer =
                 ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), 
"SpillWriteWaitIOTime", "Spill", 1);
         _spill_read_wait_io_timer =
@@ -789,6 +794,7 @@ public:
     RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
     RuntimeProfile::Counter* _spill_data_size = nullptr;
     RuntimeProfile::Counter* _spill_block_count = nullptr;
+    RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr;
     RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
     RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to