This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch 2.1-tmp
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 8a291efb0c50302d582844a948f489698f76ccbb
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Mon Apr 1 16:14:10 2024 +0800

    [improvement](spill) avoid spill if memory is enough (#33075)
---
 .../exec/partitioned_aggregation_sink_operator.cpp | 30 +++++++++------
 .../exec/partitioned_aggregation_sink_operator.h   |  3 --
 .../partitioned_aggregation_source_operator.cpp    | 29 ++++++---------
 .../exec/partitioned_aggregation_source_operator.h |  2 -
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  | 43 +++++++++++-----------
 be/src/pipeline/exec/spill_sort_sink_operator.h    |  3 --
 .../pipeline/exec/spill_sort_source_operator.cpp   | 29 ++++-----------
 be/src/pipeline/exec/spill_sort_source_operator.h  |  3 --
 be/src/pipeline/pipeline_x/dependency.h            |  2 +
 be/src/vec/common/sort/sorter.cpp                  |  3 +-
 be/src/vec/spill/spill_stream.cpp                  | 17 +++++++--
 11 files changed, 76 insertions(+), 88 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 3207c109589..d44c35a76a9 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -70,12 +70,16 @@ Status PartitionedAggSinkLocalState::close(RuntimeState* 
state, Status exec_stat
     if (Base::_closed) {
         return Status::OK();
     }
+<<<<<<< HEAD
     {
         std::unique_lock<std::mutex> lk(_spill_lock);
         if (_is_spilling) {
             _spill_cv.wait(lk);
         }
     }
+=======
+    dec_running_big_mem_op_num(state);
+>>>>>>> bb11955709 ([improvement](spill) avoid spill if memory is enough 
(#33075))
     return Base::close(state, exec_status);
 }
 
@@ -166,13 +170,17 @@ Status 
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
     auto* runtime_state = local_state._runtime_state.get();
     RETURN_IF_ERROR(_agg_sink_operator->sink(runtime_state, in_block, false));
     if (eos) {
-        LOG(INFO) << "agg node " << id() << " sink eos";
-        if (revocable_mem_size(state) > 0) {
-            RETURN_IF_ERROR(revoke_memory(state));
-        } else {
-            for (auto& partition : 
local_state._shared_state->spill_partitions) {
-                RETURN_IF_ERROR(partition->finish_current_spilling(eos));
+        if (local_state._shared_state->is_spilled) {
+            if (revocable_mem_size(state) > 0) {
+                RETURN_IF_ERROR(revoke_memory(state));
+            } else {
+                for (auto& partition : 
local_state._shared_state->spill_partitions) {
+                    RETURN_IF_ERROR(partition->finish_current_spilling(eos));
+                }
+                local_state._dependency->set_ready_to_read();
+                local_state._finish_dependency->set_ready();
             }
+        } else {
             local_state._dependency->set_ready_to_read();
         }
     }
@@ -229,8 +237,10 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
     LOG(INFO) << "agg node " << Base::_parent->id() << " revoke_memory"
               << ", eos: " << _eos;
     RETURN_IF_ERROR(Base::_shared_state->sink_status);
-    DCHECK(!_is_spilling);
-    _is_spilling = true;
+    if (!_shared_state->is_spilled) {
+        _shared_state->is_spilled = true;
+        profile()->add_info_string("Spilled", "true");
+    }
 
     // TODO: spill thread may set_ready before the task::execute thread put 
the task to blocked state
     if (!_eos) {
@@ -240,7 +250,6 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
     Status status;
     Defer defer {[&]() {
         if (!status.ok()) {
-            _is_spilling = false;
             if (!_eos) {
                 Base::_dependency->Dependency::set_ready();
             }
@@ -269,15 +278,12 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
                                   << ", eos: " << _eos;
                     }
                     {
-                        std::unique_lock<std::mutex> lk(_spill_lock);
-                        _is_spilling = false;
                         if (_eos) {
                             Base::_dependency->set_ready_to_read();
                             _finish_dependency->set_ready();
                         } else {
                             Base::_dependency->Dependency::set_ready();
                         }
-                        _spill_cv.notify_one();
                     }
                 }};
                 auto* runtime_state = _runtime_state.get();
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 5e617386812..542046556ec 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -272,9 +272,6 @@ public:
 
     bool _eos = false;
     std::shared_ptr<Dependency> _finish_dependency;
-    bool _is_spilling = false;
-    std::mutex _spill_lock;
-    std::condition_variable _spill_cv;
 
     /// Resources in shared state will be released when the operator is closed,
     /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 960decdb951..f5eceac338c 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -88,12 +88,7 @@ Status PartitionedAggLocalState::close(RuntimeState* state) {
     if (_closed) {
         return Status::OK();
     }
-    {
-        std::unique_lock<std::mutex> lk(_merge_spill_lock);
-        if (_is_merging) {
-            _merge_spill_cv.wait(lk);
-        }
-    }
+    dec_running_big_mem_op_num(state);
     return Base::close(state);
 }
 PartitionedAggSourceOperatorX::PartitionedAggSourceOperatorX(ObjectPool* pool,
@@ -131,13 +126,16 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
     SCOPED_TIMER(local_state.exec_time_counter());
     RETURN_IF_ERROR(local_state._status);
 
-    
RETURN_IF_ERROR(local_state.initiate_merge_spill_partition_agg_data(state));
+    if (local_state._shared_state->is_spilled) {
+        
RETURN_IF_ERROR(local_state.initiate_merge_spill_partition_agg_data(state));
 
-    /// When `_is_merging` is true means we are reading spilled data and 
merging the data into hash table.
-    if (local_state._is_merging) {
-        return Status::OK();
+        /// When `_is_merging` is true means we are reading spilled data and 
merging the data into hash table.
+        if (local_state._is_merging) {
+            return Status::OK();
+        }
     }
 
+    // not spilled in sink or current partition still has data
     auto* runtime_state = local_state._runtime_state.get();
     RETURN_IF_ERROR(_agg_source_operator->get_block(runtime_state, block, 
eos));
     if (local_state._runtime_state) {
@@ -146,7 +144,8 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
         local_state.update_profile(source_local_state->profile());
     }
     if (*eos) {
-        if (!local_state._shared_state->spill_partitions.empty()) {
+        if (local_state._shared_state->is_spilled &&
+            !local_state._shared_state->spill_partitions.empty()) {
             *eos = false;
         }
     }
@@ -218,12 +217,8 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
                             }
                             
Base::_shared_state->in_mem_shared_state->aggregate_data_container
                                     ->init_once();
-                            {
-                                std::unique_lock<std::mutex> 
lk(_merge_spill_lock);
-                                _is_merging = false;
-                                _dependency->Dependency::set_ready();
-                                _merge_spill_cv.notify_one();
-                            }
+                            _is_merging = false;
+                            _dependency->Dependency::set_ready();
                         }};
                         bool has_agg_data = false;
                         auto& parent = Base::_parent->template cast<Parent>();
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index ac63402f227..eff1e7179c8 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -60,8 +60,6 @@ protected:
     std::future<Status> _spill_merge_future;
     bool _current_partition_eos = true;
     bool _is_merging = false;
-    std::mutex _merge_spill_lock;
-    std::condition_variable _merge_spill_cv;
 
     /// Resources in shared state will be released when the operator is closed,
     /// but there may be asynchronous spilling tasks at this time, which can 
lead to conflicts.
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index c586a8e5e56..7764aa948b9 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -74,11 +74,9 @@ Status SpillSortSinkLocalState::open(RuntimeState* state) {
     return Status::OK();
 }
 Status SpillSortSinkLocalState::close(RuntimeState* state, Status 
execsink_status) {
-    {
-        std::unique_lock<std::mutex> lk(_spill_lock);
-        if (_is_spilling) {
-            _spill_cv.wait(lk);
-        }
+    auto& parent = Base::_parent->template cast<Parent>();
+    if (parent._enable_spill) {
+        dec_running_big_mem_op_num(state);
     }
     return Status::OK();
 }
@@ -172,9 +170,16 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Bloc
             
local_state._shared_state->in_mem_shared_state->sorter->data_size());
     if (eos) {
         if (_enable_spill) {
-            if (revocable_mem_size(state) > 0) {
-                RETURN_IF_ERROR(revoke_memory(state));
+            if (local_state._shared_state->is_spilled) {
+                if (revocable_mem_size(state) > 0) {
+                    RETURN_IF_ERROR(revoke_memory(state));
+                } else {
+                    local_state._dependency->set_ready_to_read();
+                    local_state._finish_dependency->set_ready();
+                }
             } else {
+                RETURN_IF_ERROR(
+                        
local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read());
                 local_state._dependency->set_ready_to_read();
             }
         } else {
@@ -186,8 +191,10 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Bloc
     return Status::OK();
 }
 Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
-    DCHECK(!_is_spilling);
-    _is_spilling = true;
+    if (!_shared_state->is_spilled) {
+        _shared_state->is_spilled = true;
+        profile()->add_info_string("Spilled", "true");
+    }
 
     LOG(INFO) << "sort node " << Base::_parent->id() << " revoke_memory"
               << ", eos: " << _eos;
@@ -243,17 +250,12 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
                                 _shared_state->clear();
                             }
 
-                            {
-                                std::unique_lock<std::mutex> lk(_spill_lock);
-                                _spilling_stream.reset();
-                                _is_spilling = false;
-                                if (_eos) {
-                                    _dependency->set_ready_to_read();
-                                    _finish_dependency->set_ready();
-                                } else {
-                                    _dependency->Dependency::set_ready();
-                                }
-                                _spill_cv.notify_one();
+                            _spilling_stream.reset();
+                            if (_eos) {
+                                _dependency->set_ready_to_read();
+                                _finish_dependency->set_ready();
+                            } else {
+                                _dependency->Dependency::set_ready();
                             }
                         }};
 
@@ -288,7 +290,6 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* 
state) {
                         return Status::OK();
                     });
     if (!status.ok()) {
-        _is_spilling = false;
         _spilling_stream->end_spill(status);
 
         if (!_eos) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h 
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index ae5a3bcb8c7..d66215411aa 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -62,11 +62,8 @@ private:
     RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr;
 
     bool _eos = false;
-    bool _is_spilling = false;
     vectorized::SpillStreamSPtr _spilling_stream;
     std::shared_ptr<Dependency> _finish_dependency;
-    std::mutex _spill_lock;
-    std::condition_variable _spill_cv;
 };
 
 class SpillSortSinkOperatorX final : public 
DataSinkOperatorX<SpillSortSinkLocalState> {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index d249b3be56e..417ff704bc6 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -58,17 +58,10 @@ Status SpillSortLocalState::close(RuntimeState* state) {
     if (_closed) {
         return Status::OK();
     }
-    {
-        std::unique_lock<std::mutex> lk(_merge_spill_lock);
-        if (_is_merging) {
-            _merge_spill_cv.wait(lk);
-        }
+    if (Base::_shared_state->enable_spill) {
+        dec_running_big_mem_op_num(state);
     }
     RETURN_IF_ERROR(Base::close(state));
-    for (auto& stream : _current_merging_streams) {
-        
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
-    }
-    _current_merging_streams.clear();
     return Status::OK();
 }
 int SpillSortLocalState::_calc_spill_blocks_to_merge() const {
@@ -78,14 +71,11 @@ int SpillSortLocalState::_calc_spill_blocks_to_merge() 
const {
 Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* 
state) {
     auto& parent = Base::_parent->template cast<Parent>();
     LOG(INFO) << "sort node " << _parent->node_id() << " merge spill data";
-    DCHECK(!_is_merging);
-    _is_merging = true;
     _dependency->Dependency::block();
 
     Status status;
     Defer defer {[&]() {
         if (!status.ok()) {
-            _is_merging = false;
             _dependency->Dependency::set_ready();
         }
     }};
@@ -108,12 +98,7 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
             } else {
                 LOG(INFO) << "sort node " << _parent->node_id() << " merge 
spill data finish";
             }
-            {
-                std::unique_lock<std::mutex> lk(_merge_spill_lock);
-                _is_merging = false;
-                _dependency->Dependency::set_ready();
-                _merge_spill_cv.notify_one();
-            }
+            _dependency->Dependency::set_ready();
         }};
         vectorized::Block merge_sorted_block;
         vectorized::SpillStreamSPtr tmp_stream;
@@ -252,15 +237,15 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Bloc
     SCOPED_TIMER(local_state.exec_time_counter());
     RETURN_IF_ERROR(local_state._status);
 
-    if (!local_state.Base::_shared_state->enable_spill) {
-        RETURN_IF_ERROR(
-                
_sort_source_operator->get_block(local_state._runtime_state.get(), block, eos));
-    } else {
+    if (local_state.Base::_shared_state->enable_spill && 
local_state._shared_state->is_spilled) {
         if (!local_state._merger) {
             return local_state.initiate_merge_sort_spill_streams(state);
         } else {
             RETURN_IF_ERROR(local_state._merger->get_next(block, eos));
         }
+    } else {
+        RETURN_IF_ERROR(
+                
_sort_source_operator->get_block(local_state._runtime_state.get(), block, eos));
     }
     local_state.reached_limit(block, eos);
     return Status::OK();
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h 
b/be/src/pipeline/exec/spill_sort_source_operator.h
index 8132dd5a56c..a20eb57889b 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -65,9 +65,6 @@ protected:
     int64_t _external_sort_bytes_threshold = 134217728; // 128M
     std::vector<vectorized::SpillStreamSPtr> _current_merging_streams;
     std::unique_ptr<vectorized::VSortedRunMerger> _merger;
-    bool _is_merging = false;
-    std::mutex _merge_spill_lock;
-    std::condition_variable _merge_spill_cv;
 
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
     // counters for spill merge sort
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 77cd121be5e..1af48748d6c 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -397,6 +397,7 @@ struct PartitionedAggSharedState : public BasicSharedState,
     size_t partition_count;
     size_t max_partition_index;
     Status sink_status;
+    bool is_spilled = false;
     std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions;
 
     size_t get_partition_index(size_t hash_value) const {
@@ -470,6 +471,7 @@ struct SpillSortSharedState : public BasicSharedState,
 
     SortSharedState* in_mem_shared_state = nullptr;
     bool enable_spill = false;
+    bool is_spilled = false;
     Status sink_status;
     std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr;
 
diff --git a/be/src/vec/common/sort/sorter.cpp 
b/be/src/vec/common/sort/sorter.cpp
index 53fc2011232..db3cca8bf09 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -63,6 +63,7 @@ void MergeSorterState::reset() {
     cursors_.swap(empty_cursors);
     std::vector<Block> empty_blocks(0);
     sorted_blocks_.swap(empty_blocks);
+    unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty());
     in_mem_sorted_bocks_size_ = 0;
 }
 Status MergeSorterState::add_sorted_block(Block& block) {
@@ -70,8 +71,8 @@ Status MergeSorterState::add_sorted_block(Block& block) {
     if (0 == rows) {
         return Status::OK();
     }
-    sorted_blocks_.emplace_back(std::move(block));
     in_mem_sorted_bocks_size_ += block.bytes();
+    sorted_blocks_.emplace_back(std::move(block));
     num_rows_ += rows;
     return Status::OK();
 }
diff --git a/be/src/vec/spill/spill_stream.cpp 
b/be/src/vec/spill/spill_stream.cpp
index d08b63df40b..f245f8fa309 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -68,8 +68,14 @@ void SpillStream::close() {
         read_promise_.reset();
     }
 
-    (void)writer_->close();
-    (void)reader_->close();
+    if (writer_) {
+        (void)writer_->close();
+        writer_.reset();
+    }
+    if (reader_) {
+        (void)reader_->close();
+        reader_.reset();
+    }
 }
 
 const std::string& SpillStream::get_spill_root_dir() const {
@@ -100,13 +106,16 @@ Status SpillStream::spill_block(const Block& block, bool 
eof) {
     size_t written_bytes = 0;
     RETURN_IF_ERROR(writer_->write(block, written_bytes));
     if (eof) {
-        return writer_->close();
+        RETURN_IF_ERROR(writer_->close());
+        writer_.reset();
     }
     return Status::OK();
 }
 
 Status SpillStream::spill_eof() {
-    return writer_->close();
+    RETURN_IF_ERROR(writer_->close());
+    writer_.reset();
+    return Status::OK();
 }
 
 Status SpillStream::read_next_block_sync(Block* block, bool* eos) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to