This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 5924d31ef47 fix sort spill, support low mem mod of data_stream_recvr 
and improve log
5924d31ef47 is described below

commit 5924d31ef47e7340f779d42a4d8d19b4dd92fc8b
Author: jacktengg <18241664+jackte...@users.noreply.github.com>
AuthorDate: Wed Oct 30 17:51:35 2024 +0800

    fix sort spill, support low mem mod of data_stream_recvr and improve log
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  9 +++---
 be/src/pipeline/exec/operator.h                    |  8 ++++--
 be/src/pipeline/exec/sort_sink_operator.cpp        |  5 ++++
 be/src/pipeline/exec/sort_sink_operator.h          |  4 +++
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  | 10 +++++++
 be/src/pipeline/exec/spill_sort_sink_operator.h    |  3 ++
 .../workload_group/workload_group_manager.cpp      | 23 ++++++++++++----
 be/src/vec/common/sort/sorter.cpp                  | 32 ++++++++++++++++++++++
 be/src/vec/common/sort/sorter.h                    |  4 +++
 be/src/vec/runtime/vdata_stream_recvr.cpp          |  3 ++
 10 files changed, 89 insertions(+), 12 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index a334f57859b..360e7087d15 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -23,6 +23,7 @@
 #include "exprs/bloom_filter_func.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
 #include "pipeline/exec/operator.h"
+#include "util/pretty_printer.h"
 #include "vec/core/block.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/utils/template_helpers.hpp"
@@ -129,10 +130,9 @@ size_t 
HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state, bo
         const auto allocated_bytes = 
_build_side_mutable_block.allocated_bytes();
         const auto bytes_per_row = bytes / build_block_rows;
         const auto estimated_size_of_next_block = bytes_per_row * 
state->batch_size();
-
-        // If the new size is greater than 95% of allocalted bytes, it maybe 
need to realloc.
+        // If the new size is greater than 85% of allocalted bytes, it maybe 
need to realloc.
         if (((estimated_size_of_next_block + bytes) * 100 / allocated_bytes) 
>= 85) {
-            size_to_reserve += bytes + estimated_size_of_next_block;
+            size_to_reserve += (size_t)(allocated_bytes * 1.15);
         }
     }
 
@@ -355,7 +355,8 @@ Status 
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
     }
 
     LOG(INFO) << "build block rows: " << block.rows() << ", columns count: " 
<< block.columns()
-              << ", bytes/allocated_bytes: " << block.bytes() << "/" << 
block.allocated_bytes();
+              << ", bytes/allocated_bytes: " << 
PrettyPrinter::print_bytes(block.bytes()) << "/"
+              << PrettyPrinter::print_bytes(block.allocated_bytes());
 
     COUNTER_UPDATE(_build_rows_counter, rows);
     block.replace_if_overflow();
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 7c0bd44a664..054e969ac04 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -302,8 +302,9 @@ public:
             auto* read_file_bytes = 
Base::profile()->get_counter("SpillReadFileBytes");
             Base::_query_statistics->add_spill_bytes(
                     write_block_bytes ? write_block_bytes->value() : 0,
-                    write_file_bytes ? write_file_bytes->value() : 0, 
read_block_bytes->value(),
-                    read_file_bytes->value());
+                    write_file_bytes ? write_file_bytes->value() : 0,
+                    read_block_bytes ? read_block_bytes->value() : 0,
+                    read_file_bytes ? read_file_bytes->value() : 0);
         }
         return Base::close(state);
     }
@@ -747,7 +748,8 @@ public:
             auto* read_block_bytes = 
Base::profile()->get_counter("SpillReadBlockBytes");
             auto* read_file_bytes = 
Base::profile()->get_counter("SpillReadFileBytes");
             Base::_query_statistics->add_spill_bytes(
-                    write_block_bytes->value(), write_file_bytes->value(),
+                    write_block_bytes ? write_block_bytes->value() : 0,
+                    write_file_bytes ? write_file_bytes->value() : 0,
                     read_block_bytes ? read_block_bytes->value() : 0,
                     read_file_bytes ? read_file_bytes->value() : 0);
         }
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index ee8689a8084..d0f30ac48c1 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -148,6 +148,11 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, 
vectorized::Block* in
     return Status::OK();
 }
 
+size_t SortSinkOperatorX::get_reserve_mem_size_for_next_sink(RuntimeState* 
state, bool eos) {
+    auto& local_state = get_local_state(state);
+    return local_state._shared_state->sorter->get_reserve_mem_size(state, eos);
+}
+
 size_t SortSinkOperatorX::get_revocable_mem_size(RuntimeState* state) const {
     auto& local_state = get_local_state(state);
     return local_state._shared_state->sorter->data_size();
diff --git a/be/src/pipeline/exec/sort_sink_operator.h 
b/be/src/pipeline/exec/sort_sink_operator.h
index 8462472dd02..43e8e59f3de 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -36,6 +36,8 @@ public:
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status open(RuntimeState* state) override;
 
+    [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
+
 private:
     friend class SortSinkOperatorX;
 
@@ -77,6 +79,8 @@ public:
 
     size_t get_revocable_mem_size(RuntimeState* state) const;
 
+    size_t get_reserve_mem_size_for_next_sink(RuntimeState* state, bool eos);
+
     Status prepare_for_spill(RuntimeState* state);
 
     Status merge_sort_read_for_spill(RuntimeState* state, 
doris::vectorized::Block* block,
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 03e9f33553e..00a60a4c747 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -136,6 +136,10 @@ Status SpillSortSinkOperatorX::open(RuntimeState* state) {
     return _sort_sink_operator->open(state);
 }
 
+size_t SpillSortSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool 
eos) {
+    auto& local_state = get_local_state(state);
+    return local_state.get_reserve_mem_size(state, eos);
+}
 Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state,
                                              const 
std::shared_ptr<SpillContext>& spill_context) {
     auto& local_state = get_local_state(state);
@@ -190,6 +194,12 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Bloc
     return Status::OK();
 }
 
+size_t SpillSortSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool 
eos) {
+    auto& parent = Base::_parent->template cast<Parent>();
+    return 
parent._sort_sink_operator->get_reserve_mem_size_for_next_sink(_runtime_state.get(),
+                                                                          eos);
+}
+
 Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
                                               const 
std::shared_ptr<SpillContext>& spill_context) {
     if (!_shared_state->is_spilled) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h 
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 086d93a970c..8984b1e43de 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -41,6 +41,7 @@ public:
     Dependency* finishdependency() override { return _finish_dependency.get(); 
}
 
     Status setup_in_memory_sort_op(RuntimeState* state);
+    [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
     Status revoke_memory(RuntimeState* state, const 
std::shared_ptr<SpillContext>& spill_context);
 
 private:
@@ -86,6 +87,8 @@ public:
         return _sort_sink_operator->set_child(child);
     }
 
+    size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
+
     size_t revocable_mem_size(RuntimeState* state) const override;
 
     Status revoke_memory(RuntimeState* state,
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 853f3740551..b2c33f2d378 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -31,6 +31,7 @@
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/workload_group/workload_group.h"
 #include "util/mem_info.h"
+#include "util/pretty_printer.h"
 #include "util/threadpool.h"
 #include "util/time.h"
 #include "vec/core/block.h"
@@ -637,12 +638,24 @@ bool 
WorkloadGroupMgr::handle_single_query_(std::shared_ptr<QueryContext> query_
                 return true;
             } else {
                 // Use MEM_LIMIT_EXCEEDED so that FE could parse the error 
code and do try logic
-                
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
-                        "query({}) reserve memory failed, but could not find 
memory that "
-                        "could "
-                        "release or spill to disk(memory usage:{}, limit: {})",
+                auto msg1 = fmt::format(
+                        "query {} reserve memory failed, but could not find 
memory that could "
+                        "release or spill to disk. Query memory usage: {}, 
limit: {}, process "
+                        "memory info: {}"
+                        ", wg info: {}.",
                         query_id, PrettyPrinter::print_bytes(memory_usage),
-                        
PrettyPrinter::print_bytes(query_ctx->get_mem_limit())));
+                        PrettyPrinter::print_bytes(query_ctx->get_mem_limit()),
+                        
GlobalMemoryArbitrator::process_memory_used_details_str(),
+                        query_ctx->workload_group()->memory_debug_string());
+                auto msg2 = msg1 + fmt::format(
+                                           " Query Memory Tracker Summary: {}."
+                                           " Load Memory Tracker Summary: {}",
+                                           
MemTrackerLimiter::make_type_trackers_profile_str(
+                                                   
MemTrackerLimiter::Type::QUERY),
+                                           
MemTrackerLimiter::make_type_trackers_profile_str(
+                                                   
MemTrackerLimiter::Type::LOAD));
+                LOG(INFO) << msg2;
+                
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1));
             }
         } else {
             if (!GlobalMemoryArbitrator::is_exceed_hard_mem_limit()) {
diff --git a/be/src/vec/common/sort/sorter.cpp 
b/be/src/vec/common/sort/sorter.cpp
index 72bf35f3cba..0a9875c0019 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -215,6 +215,38 @@ FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, 
int limit, int64_t offs
         : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, 
nulls_first),
           _state(MergeSorterState::create_unique(row_desc, offset, limit, 
state, profile)) {}
 
+size_t FullSorter::get_reserve_mem_size(RuntimeState* state, bool eos) const {
+    size_t size_to_reserve = 0;
+    const auto rows = _state->unsorted_block_->rows();
+    if (rows != 0) {
+        const auto bytes = _state->unsorted_block_->bytes();
+        const auto allocated_bytes = 
_state->unsorted_block_->allocated_bytes();
+        const auto bytes_per_row = bytes / rows;
+        const auto estimated_size_of_next_block = bytes_per_row * 
state->batch_size();
+        auto new_block_bytes = estimated_size_of_next_block + bytes;
+        auto new_rows = rows + state->batch_size();
+        // If the new size is greater than 85% of allocalted bytes, it maybe 
need to realloc.
+        if ((new_block_bytes * 100 / allocated_bytes) >= 85) {
+            size_to_reserve += (size_t)(allocated_bytes * 1.15);
+        }
+        auto sort = new_rows > buffered_block_size_ || new_block_bytes > 
buffered_block_bytes_;
+        if (sort) {
+            // new column is created when doing sort, reserve average size of 
one column
+            // for estimation
+            size_to_reserve += new_block_bytes / 
_state->unsorted_block_->columns();
+
+            // helping data structures used during sorting
+            size_to_reserve += new_rows * 
sizeof(IColumn::Permutation::value_type);
+
+            auto sort_columns_count = 
_vsort_exec_exprs.lhs_ordering_expr_ctxs().size();
+            if (1 != sort_columns_count) {
+                size_to_reserve += new_rows * sizeof(EqualRangeIterator);
+            }
+        }
+    }
+    return size_to_reserve;
+}
+
 Status FullSorter::append_block(Block* block) {
     DCHECK(block->rows() > 0);
     {
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index aa7d88dfbc2..f89f996fd36 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -121,6 +121,8 @@ public:
 
     virtual size_t data_size() const = 0;
 
+    virtual size_t get_reserve_mem_size(RuntimeState* state, bool eos) const { 
return 0; }
+
     // for topn runtime predicate
     const SortDescription& get_sort_description() const { return 
_sort_description; }
     virtual Field get_top_value() { return Field {Field::Types::Null}; }
@@ -171,6 +173,8 @@ public:
 
     size_t data_size() const override;
 
+    size_t get_reserve_mem_size(RuntimeState* state, bool eos) const override;
+
     Status merge_sort_read_for_spill(RuntimeState* state, 
doris::vectorized::Block* block,
                                      int batch_size, bool* eos) override;
     void reset() override;
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index a83f8d485a3..fa65175172f 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -405,6 +405,9 @@ Status VDataStreamRecvr::add_block(const PBlock& pblock, 
int sender_id, int be_n
                                    int64_t packet_seq, 
::google::protobuf::Closure** done,
                                    const int64_t wait_for_worker,
                                    const uint64_t time_to_find_recvr) {
+    if (_parent->state()->get_query_ctx()->low_memory_mode()) {
+        set_low_memory_mode();
+    }
     SCOPED_ATTACH_TASK(_query_thread_context);
     int use_sender_id = _is_merging ? sender_id : 0;
     return _sender_queues[use_sender_id]->add_block(pblock, be_number, 
packet_seq, done,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to