This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 9fc0b9c44629efd2c0a09ade2262e0186a084856
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Fri Oct 11 15:59:38 2024 +0800

    [feat] reserve memory separately for sink and source operators (#41706)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  6 +-
 be/src/pipeline/exec/aggregation_sink_operator.h   |  4 +-
 be/src/pipeline/exec/analytic_sink_operator.cpp    |  2 +-
 be/src/pipeline/exec/analytic_sink_operator.h      |  2 +-
 be/src/pipeline/exec/exchange_sink_buffer.h        |  4 +-
 be/src/pipeline/exec/exchange_source_operator.cpp  |  4 +-
 be/src/pipeline/exec/hashjoin_build_sink.cpp       | 80 ++++++++++++++++++----
 be/src/pipeline/exec/hashjoin_build_sink.h         |  4 +-
 be/src/pipeline/exec/operator.h                    | 14 ++--
 .../pipeline/exec/partition_sort_sink_operator.cpp |  2 +-
 .../pipeline/exec/partition_sort_sink_operator.h   |  2 +-
 .../exec/partitioned_aggregation_sink_operator.cpp |  4 +-
 .../exec/partitioned_aggregation_sink_operator.h   |  2 +-
 .../exec/partitioned_hash_join_probe_operator.cpp  | 42 ++++++++++--
 .../exec/partitioned_hash_join_probe_operator.h    |  4 ++
 .../exec/partitioned_hash_join_sink_operator.cpp   |  8 +--
 .../exec/partitioned_hash_join_sink_operator.h     |  7 +-
 be/src/pipeline/exec/set_probe_sink_operator.cpp   |  2 +-
 be/src/pipeline/exec/set_probe_sink_operator.h     |  2 +-
 be/src/pipeline/exec/set_sink_operator.cpp         |  2 +-
 be/src/pipeline/exec/set_sink_operator.h           |  2 +-
 be/src/pipeline/pipeline_task.cpp                  | 55 +++++++++++----
 be/src/pipeline/pipeline_task.h                    |  4 +-
 be/src/runtime/query_context.cpp                   | 10 +--
 be/src/vec/common/hash_table/hash_map_context.h    | 56 +++++++++++++++
 be/src/vec/runtime/vdata_stream_recvr.cpp          |  3 +-
 26 files changed, 255 insertions(+), 72 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index bcbf83f6290..3dd43c3c4d8 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -718,7 +718,7 @@ Status AggSinkLocalState::_init_hash_method(const 
vectorized::VExprContextSPtrs&
     return Status::OK();
 }
 
-size_t AggSinkLocalState::get_reserve_mem_size(RuntimeState* state) const {
+size_t AggSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool eos) 
const {
     size_t size_to_reserve = std::visit(
             [&](auto&& arg) -> size_t {
                 using HashTableCtxType = std::decay_t<decltype(arg)>;
@@ -891,9 +891,9 @@ Status AggSinkOperatorX::reset_hash_table(RuntimeState* 
state) {
     return Status::OK();
 }
 
-size_t AggSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
+size_t AggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos) {
     auto& local_state = get_local_state(state);
-    return local_state.get_reserve_mem_size(state);
+    return local_state.get_reserve_mem_size(state, eos);
 }
 
 Status AggSinkLocalState::close(RuntimeState* state, Status exec_status) {
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 477df890b4f..8bf3a8493fc 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -97,7 +97,7 @@ protected:
     Status _create_agg_status(vectorized::AggregateDataPtr data);
     size_t _memory_usage() const;
 
-    size_t get_reserve_mem_size(RuntimeState* state) const;
+    size_t get_reserve_mem_size(RuntimeState* state, bool eos) const;
 
     RuntimeProfile::Counter* _hash_table_compute_timer = nullptr;
     RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr;
@@ -170,7 +170,7 @@ public:
 
     Status reset_hash_table(RuntimeState* state);
 
-    size_t get_reserve_mem_size(RuntimeState* state) override;
+    size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
     using DataSinkOperatorX<AggSinkLocalState>::node_id;
     using DataSinkOperatorX<AggSinkLocalState>::operator_id;
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index e11ffe0c48a..63b93ad4a59 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -332,7 +332,7 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Block
     return Status::OK();
 }
 
-size_t AnalyticSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
+size_t AnalyticSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool 
eos) {
     auto& local_state = get_local_state(state);
     return local_state._reserve_mem_size;
 }
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index dee7ee27c30..f5b53314450 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -95,7 +95,7 @@ public:
         return !_partition_by_eq_expr_ctxs.empty() && 
_order_by_eq_expr_ctxs.empty();
     }
 
-    size_t get_reserve_mem_size(RuntimeState* state) override;
+    size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
 private:
     Status _insert_range_column(vectorized::Block* block, const 
vectorized::VExprContextSPtr& expr,
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 23ba2220bec..60193b1d7e6 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -91,7 +91,7 @@ public:
 
     void set_low_memory_mode() {
         _total_queue_buffer_size_limit = 1024 * 1024;
-        _total_queue_blocks_count_limit = 1;
+        _total_queue_blocks_count_limit = 8;
     }
 
     void acquire(BroadcastPBlockHolder& holder);
@@ -207,7 +207,7 @@ public:
         _set_ready_to_finish(_busy_channels == 0);
     }
 
-    void set_low_memory_mode() { _queue_capacity = 1; }
+    void set_low_memory_mode() { _queue_capacity = 8; }
 
 private:
     friend class ExchangeSinkLocalState;
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index ff25a252cca..dd7b62bcadf 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -17,6 +17,8 @@
 
 #include "exchange_source_operator.h"
 
+#include <fmt/core.h>
+
 #include <memory>
 
 #include "pipeline/exec/operator.h"
@@ -71,7 +73,7 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, 
timer_name, 1);
     for (size_t i = 0; i < queues.size(); i++) {
         deps[i] = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
-                                            "SHUFFLE_DATA_DEPENDENCY");
+                                            
fmt::format("SHUFFLE_DATA_DEPENDENCY_{}", i));
         queues[i]->set_dependency(deps[i]);
         metrics[i] = 
_runtime_profile->add_nonzero_counter(fmt::format("WaitForData{}", i),
                                                            TUnit ::TIME_NS, 
timer_name, 1);
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 3132b062af3..79c068a83c1 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -19,9 +19,11 @@
 
 #include <string>
 
+#include "common/exception.h"
 #include "exprs/bloom_filter_func.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
 #include "pipeline/exec/operator.h"
+#include "vec/core/block.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/utils/template_helpers.hpp"
 
@@ -110,7 +112,7 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* 
state) {
     return Status::OK();
 }
 
-size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) {
+size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state, 
bool eos) {
     if (!_should_build_hash_table) {
         return 0;
     }
@@ -121,26 +123,74 @@ size_t 
HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) {
 
     size_t size_to_reserve = 0;
 
-    if (!_build_side_mutable_block.empty()) {
+    const size_t build_block_rows = _build_side_mutable_block.rows();
+    if (build_block_rows != 0) {
         const auto bytes = _build_side_mutable_block.bytes();
         const auto allocated_bytes = 
_build_side_mutable_block.allocated_bytes();
-        if (allocated_bytes != 0 && ((bytes * 100) / allocated_bytes) >= 85) {
-            size_to_reserve += bytes;
+        const auto bytes_per_row = bytes / build_block_rows;
+        const auto estimated_size_of_next_block = bytes_per_row * 
state->batch_size();
+
+        // If the new size is greater than 95% of allocalted bytes, it maybe 
need to realloc.
+        if (((estimated_size_of_next_block + bytes) * 100 / allocated_bytes) 
>= 95) {
+            size_to_reserve += bytes + estimated_size_of_next_block;
         }
     }
 
-    const size_t rows = _build_side_mutable_block.rows() + state->batch_size();
-    size_t bucket_size = JoinHashTable<StringRef>::calc_bucket_size(rows);
+    if (eos) {
+        const size_t rows = build_block_rows + state->batch_size();
+        size_t bucket_size = JoinHashTable<StringRef>::calc_bucket_size(rows);
 
-    size_to_reserve += bucket_size * sizeof(uint32_t); // JoinHashTable::first
-    size_to_reserve += rows * sizeof(uint32_t);        // JoinHashTable::next
+        size_to_reserve += bucket_size * sizeof(uint32_t); // 
JoinHashTable::first
+        size_to_reserve += rows * sizeof(uint32_t);        // 
JoinHashTable::next
 
-    auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
-    if (p._join_op == TJoinOp::FULL_OUTER_JOIN || p._join_op == 
TJoinOp::RIGHT_OUTER_JOIN ||
-        p._join_op == TJoinOp::RIGHT_ANTI_JOIN || p._join_op == 
TJoinOp::RIGHT_SEMI_JOIN) {
-        size_to_reserve += rows * sizeof(uint8_t); // JoinHashTable::visited
+        auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
+        if (p._join_op == TJoinOp::FULL_OUTER_JOIN || p._join_op == 
TJoinOp::RIGHT_OUTER_JOIN ||
+            p._join_op == TJoinOp::RIGHT_ANTI_JOIN || p._join_op == 
TJoinOp::RIGHT_SEMI_JOIN) {
+            size_to_reserve += rows * sizeof(uint8_t); // 
JoinHashTable::visited
+        }
+        size_to_reserve += _evaluate_mem_usage;
+
+        vectorized::ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size());
+
+        if (build_block_rows > 0) {
+            auto block = _build_side_mutable_block.to_block();
+            Defer defer([&]() {
+                _build_side_mutable_block = 
vectorized::MutableBlock(std::move(block));
+            });
+            vectorized::ColumnUInt8::MutablePtr null_map_val;
+            if (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == 
TJoinOp::FULL_OUTER_JOIN) {
+                _convert_block_to_null(block);
+                // first row is mocked
+                for (int i = 0; i < block.columns(); i++) {
+                    auto [column, is_const] = 
unpack_if_const(block.safe_get_by_position(i).column);
+                    
assert_cast<vectorized::ColumnNullable*>(column->assume_mutable().get())
+                            ->get_null_map_column()
+                            .get_data()
+                            .data()[0] = 1;
+                }
+            }
+
+            null_map_val = vectorized::ColumnUInt8::create();
+            null_map_val->get_data().assign(build_block_rows, (uint8_t)0);
+
+            // Get the key column that needs to be built
+            Status st = _extract_join_column(block, null_map_val, raw_ptrs, 
_build_col_ids);
+            if (!st.ok()) {
+                throw Exception(st);
+            }
+
+            std::visit(vectorized::Overload {[&](std::monostate& arg) {
+                                                 LOG(FATAL) << "FATAL: 
uninited hash table";
+                                                 __builtin_unreachable();
+                                             },
+                                             [&](auto&& hash_map_context) {
+                                                 size_to_reserve += 
hash_map_context.estimated_size(
+                                                         raw_ptrs, 
block.rows(), true, true,
+                                                         bucket_size);
+                                             }},
+                       *_shared_state->hash_table_variants);
+        }
     }
-    size_to_reserve += _evaluate_mem_usage;
     return size_to_reserve;
 }
 
@@ -702,9 +752,9 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     return Status::OK();
 }
 
-size_t HashJoinBuildSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
+size_t HashJoinBuildSinkOperatorX::get_reserve_mem_size(RuntimeState* state, 
bool eos) {
     auto& local_state = get_local_state(state);
-    return local_state.get_reserve_mem_size(state);
+    return local_state.get_reserve_mem_size(state, eos);
 }
 
 size_t HashJoinBuildSinkOperatorX::get_memory_usage(RuntimeState* state) const 
{
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 0ced9a77207..b4a60fa362d 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -54,7 +54,7 @@ public:
 
     Status close(RuntimeState* state, Status exec_status) override;
 
-    [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state);
+    [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
 
 protected:
     void _hash_table_init(RuntimeState* state);
@@ -124,7 +124,7 @@ public:
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
-    size_t get_reserve_mem_size(RuntimeState* state) override;
+    size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
     [[nodiscard]] size_t get_memory_usage(RuntimeState* state) const;
 
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 984d2f00bad..649bc70c238 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -109,11 +109,6 @@ public:
 
     virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; }
 
-    // If this method is not overwrite by child, its default value is 1MB
-    [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) {
-        return state->minimum_operator_memory_required_bytes();
-    }
-
     virtual Status revoke_memory(RuntimeState* state,
                                  const std::shared_ptr<SpillContext>& 
spill_context) {
         return Status::OK();
@@ -480,6 +475,10 @@ public:
     [[nodiscard]] virtual Status setup_local_state(RuntimeState* state,
                                                    LocalSinkStateInfo& info) = 
0;
 
+    [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state, 
bool eos) {
+        return state->minimum_operator_memory_required_bytes();
+    }
+
     template <class TARGET>
     TARGET& cast() {
         DCHECK(dynamic_cast<TARGET*>(this))
@@ -749,6 +748,11 @@ public:
         return Status::OK();
     }
 
+    // If this method is not overwrite by child, its default value is 1MB
+    [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) {
+        return state->minimum_operator_memory_required_bytes();
+    }
+
     virtual std::string debug_string(int indentation_level = 0) const;
 
     virtual std::string debug_string(RuntimeState* state, int 
indentation_level = 0) const;
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index f62d5d13600..129743c3d7d 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -249,7 +249,7 @@ Status 
PartitionSortSinkOperatorX::_split_block_by_partition(
     return Status::OK();
 }
 
-size_t PartitionSortSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
+size_t PartitionSortSinkOperatorX::get_reserve_mem_size(RuntimeState* state, 
bool eos) {
     auto& local_state = get_local_state(state);
     auto rows = state->batch_size();
     size_t reserve_mem_size = std::visit(
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h 
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index 5eccb6ac9ca..0f1f3cd1b2c 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -274,7 +274,7 @@ public:
         return {ExchangeType::PASSTHROUGH};
     }
 
-    size_t get_reserve_mem_size(RuntimeState* state) override;
+    size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
 private:
     friend class PartitionSortSinkLocalState;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index f84a3636800..1a86fdb2a9d 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -254,10 +254,10 @@ Status 
PartitionedAggSinkLocalState::setup_in_memory_agg_op(RuntimeState* state)
     return sink_local_state->open(state);
 }
 
-size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
+size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, 
bool eos) {
     auto& local_state = get_local_state(state);
     auto* runtime_state = local_state._runtime_state.get();
-    auto size = _agg_sink_operator->get_reserve_mem_size(runtime_state);
+    auto size = _agg_sink_operator->get_reserve_mem_size(runtime_state, eos);
     COUNTER_SET(local_state._memory_usage_reserved, int64_t(size));
     return size;
 }
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 1124ba48c35..9b70da54943 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -346,7 +346,7 @@ public:
     Status revoke_memory(RuntimeState* state,
                          const std::shared_ptr<SpillContext>& spill_context) 
override;
 
-    size_t get_reserve_mem_size(RuntimeState* state) override;
+    size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
 private:
     friend class PartitionedAggSinkLocalState;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index bb65f098151..7b60c9a3e2f 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -23,6 +23,7 @@
 #include <algorithm>
 #include <utility>
 
+#include "common/logging.h"
 #include "common/status.h"
 #include "pipeline/pipeline_task.h"
 #include "runtime/fragment_mgr.h"
@@ -67,6 +68,8 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* 
state, LocalStateI
     _spill_probe_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillProbeTime", 1);
     _recovery_probe_blocks = ADD_COUNTER(profile(), 
"SpillRecoveryProbeBlocks", TUnit::UNIT);
     _recovery_probe_timer = ADD_TIMER_WITH_LEVEL(profile(), 
"SpillRecoveryProbeTime", 1);
+    _get_child_next_timer = ADD_TIMER_WITH_LEVEL(profile(), 
"GetChildNextTime", 1);
+
     _memory_usage_reserved =
             ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", 
TUnit::UNIT, 1);
 
@@ -870,6 +873,35 @@ size_t 
PartitionedHashJoinProbeOperatorX::_revocable_mem_size(RuntimeState* stat
     return mem_size;
 }
 
+size_t PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* 
state) {
+    auto& local_state = get_local_state(state);
+    const auto need_to_spill = local_state._shared_state->need_to_spill;
+    if (!need_to_spill || !local_state._child_eos) {
+        return Base::get_reserve_mem_size(state);
+    }
+
+    size_t size_to_reserve = 
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM;
+
+    if (local_state._need_to_setup_internal_operators) {
+        const size_t rows =
+                (local_state._recovered_build_block ? 
local_state._recovered_build_block->rows()
+                                                    : 0) +
+                state->batch_size();
+        size_t bucket_size = JoinHashTable<StringRef>::calc_bucket_size(rows);
+
+        size_to_reserve += bucket_size * sizeof(uint32_t); // 
JoinHashTable::first
+        size_to_reserve += rows * sizeof(uint32_t);        // 
JoinHashTable::next
+
+        if (_join_op == TJoinOp::FULL_OUTER_JOIN || _join_op == 
TJoinOp::RIGHT_OUTER_JOIN ||
+            _join_op == TJoinOp::RIGHT_ANTI_JOIN || _join_op == 
TJoinOp::RIGHT_SEMI_JOIN) {
+            size_to_reserve += rows * sizeof(uint8_t); // 
JoinHashTable::visited
+        }
+    }
+
+    COUNTER_SET(local_state._memory_usage_reserved, int64_t(size_to_reserve));
+    return size_to_reserve;
+}
+
 Status PartitionedHashJoinProbeOperatorX::revoke_memory(
         RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
     auto& local_state = get_local_state(state);
@@ -925,7 +957,6 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
                                                     bool* eos) {
     *eos = false;
     auto& local_state = get_local_state(state);
-    SCOPED_TIMER(local_state.exec_time_counter());
     const auto need_to_spill = local_state._shared_state->need_to_spill;
 #ifndef NDEBUG
     Defer eos_check_defer([&] {
@@ -944,16 +975,19 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
     });
 
     if (need_more_input_data(state)) {
-        RETURN_IF_ERROR(_child->get_block_after_projects(state, 
local_state._child_block.get(),
-                                                         
&local_state._child_eos));
+        {
+            SCOPED_TIMER(local_state._get_child_next_timer);
+            RETURN_IF_ERROR(_child->get_block_after_projects(state, 
local_state._child_block.get(),
+                                                             
&local_state._child_eos));
+        }
 
+        SCOPED_TIMER(local_state.exec_time_counter());
         if (local_state._child_block->rows() == 0 && !local_state._child_eos) {
             return Status::OK();
         }
 
         Defer defer([&] { local_state._child_block->clear_column_data(); });
         if (need_to_spill) {
-            SCOPED_TIMER(local_state.exec_time_counter());
             RETURN_IF_ERROR(push(state, local_state._child_block.get(), 
local_state._child_eos));
             if (_should_revoke_memory(state)) {
                 return _revoke_memory(state);
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 0ffa446f181..e66b730685b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -144,6 +144,8 @@ private:
     RuntimeProfile::Counter* _join_filter_timer = nullptr;
     RuntimeProfile::Counter* _build_output_block_timer = nullptr;
     RuntimeProfile::Counter* _memory_usage_reserved = nullptr;
+
+    RuntimeProfile::Counter* _get_child_next_timer = nullptr;
 };
 
 class PartitionedHashJoinProbeOperatorX final
@@ -183,6 +185,8 @@ public:
 
     size_t revocable_mem_size(RuntimeState* state) const override;
 
+    size_t get_reserve_mem_size(RuntimeState* state) override;
+
     void set_inner_operators(const 
std::shared_ptr<HashJoinBuildSinkOperatorX>& sink_operator,
                              const std::shared_ptr<HashJoinProbeOperatorX>& 
probe_operator) {
         _inner_sink_operator = sink_operator;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index b02c7ee0971..f7d38fe9d5d 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -117,7 +117,7 @@ size_t 
PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
     return mem_size;
 }
 
-size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* 
state) {
+size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* 
state, bool eos) {
     size_t size_to_reserve = 0;
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
     if (_shared_state->need_to_spill) {
@@ -125,7 +125,7 @@ size_t 
PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* sta
     } else {
         if (_shared_state->inner_runtime_state) {
             size_to_reserve = p._inner_sink_operator->get_reserve_mem_size(
-                    _shared_state->inner_runtime_state.get());
+                    _shared_state->inner_runtime_state.get(), eos);
         }
     }
 
@@ -680,9 +680,9 @@ Status PartitionedHashJoinSinkOperatorX::revoke_memory(
     return local_state.revoke_memory(state, spill_context);
 }
 
-size_t PartitionedHashJoinSinkOperatorX::get_reserve_mem_size(RuntimeState* 
state) {
+size_t PartitionedHashJoinSinkOperatorX::get_reserve_mem_size(RuntimeState* 
state, bool eos) {
     auto& local_state = get_local_state(state);
-    return local_state.get_reserve_mem_size(state);
+    return local_state.get_reserve_mem_size(state, eos);
 }
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index d8cb67ca08d..d3725997882 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -25,6 +25,7 @@
 #include "pipeline/exec/hashjoin_probe_operator.h"
 #include "pipeline/exec/join_build_sink_operator.h"
 #include "pipeline/exec/spill_utils.h"
+#include "vec/core/block.h"
 #include "vec/runtime/partitioner.h"
 
 namespace doris {
@@ -45,7 +46,7 @@ public:
     Status close(RuntimeState* state, Status exec_status) override;
     Status revoke_memory(RuntimeState* state, const 
std::shared_ptr<SpillContext>& spill_context);
     size_t revocable_mem_size(RuntimeState* state) const;
-    [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state);
+    [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
 
 protected:
     PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent, 
RuntimeState* state)
@@ -69,6 +70,8 @@ protected:
     std::atomic<bool> _spill_status_ok {true};
     std::mutex _spill_lock;
 
+    vectorized::Block _pending_block;
+
     bool _child_eos {false};
 
     Status _spill_status;
@@ -110,7 +113,7 @@ public:
     Status revoke_memory(RuntimeState* state,
                          const std::shared_ptr<SpillContext>& spill_context) 
override;
 
-    size_t get_reserve_mem_size(RuntimeState* state) override;
+    size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
     DataDistribution required_data_distribution() const override {
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp 
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 7e62892f516..1db187ef307 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -196,7 +196,7 @@ void SetProbeSinkOperatorX<is_intersect>::_finalize_probe(
 }
 
 template <bool is_intersect>
-size_t SetProbeSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState* 
state) {
+size_t SetProbeSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState* 
state, bool eos) {
     auto& local_state = get_local_state(state);
     return local_state._estimate_memory_usage;
 }
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 542f6652dd4..5ba248d9fda 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -102,7 +102,7 @@ public:
 
     std::shared_ptr<BasicSharedState> create_shared_state() const override { 
return nullptr; }
 
-    size_t get_reserve_mem_size(RuntimeState* state) override;
+    size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
 private:
     void _finalize_probe(SetProbeSinkLocalState<is_intersect>& local_state);
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index e15cecd22ed..df5d8d44f0a 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -207,7 +207,7 @@ Status SetSinkOperatorX<is_intersect>::init(const 
TPlanNode& tnode, RuntimeState
 }
 
 template <bool is_intersect>
-size_t SetSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState* 
state) {
+size_t SetSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState* 
state, bool eos) {
     auto& local_state = get_local_state(state);
     size_t size_to_reserve = std::visit(
             [&](auto&& arg) -> size_t {
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index be600ff6fd5..ed9aa1afead 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -95,7 +95,7 @@ public:
     }
     bool require_shuffled_data_distribution() const override { return true; }
 
-    size_t get_reserve_mem_size(RuntimeState* state) override;
+    size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
 private:
     template <class HashTableContext, bool is_intersected>
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 5509fcb19ed..b3282810d86 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -43,6 +43,7 @@
 #include "util/mem_info.h"
 #include "util/runtime_profile.h"
 #include "util/uid_util.h"
+#include "vec/core/block.h"
 #include "vec/spill/spill_stream.h"
 
 namespace doris {
@@ -316,7 +317,7 @@ Status PipelineTask::execute(bool* eos) {
     SCOPED_ATTACH_TASK(_state);
     _eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream;
     *eos = _eos;
-    if (_eos) {
+    if (_eos && !_pending_block) {
         // If task is waken up by finish dependency, `_eos` is set to true by 
last execution, and we should return here.
         return Status::OK();
     }
@@ -384,10 +385,8 @@ Status PipelineTask::execute(bool* eos) {
                     Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task 
executing failed");
             return status;
         });
-
-        DEFER_RELEASE_RESERVED();
         // Every loop should check if memory is not enough.
-        _state->get_query_ctx()->update_low_memory_mode();
+        // _state->get_query_ctx()->update_low_memory_mode();
 
         // `_dry_run` means sink operator need no more data
         // `_sink->is_finished(_state)` means sink operator should be finished
@@ -396,13 +395,19 @@ Status PipelineTask::execute(bool* eos) {
         if (_dry_run || _sink->is_finished(_state)) {
             *eos = true;
             _eos = true;
+        } else if (_pending_block) [[unlikely]] {
+            LOG(INFO) << "query: " << print_id(query_id)
+                      << " has pending block, size: " << 
_pending_block->allocated_bytes();
+            _block = std::move(_pending_block);
+            block = _block.get();
         } else {
             SCOPED_TIMER(_get_block_timer);
+            DEFER_RELEASE_RESERVED();
             _get_block_counter->update(1);
-            size_t sink_reserve_size = _sink->get_reserve_mem_size(_state);
-            sink_reserve_size =
-                    std::max(sink_reserve_size, 
_state->minimum_operator_memory_required_bytes());
-            reserve_size = _root->get_reserve_mem_size(_state) + 
sink_reserve_size;
+            // size_t sink_reserve_size = _sink->get_reserve_mem_size(_state);
+            // sink_reserve_size =
+            //         std::max(sink_reserve_size, 
_state->minimum_operator_memory_required_bytes());
+            reserve_size = _root->get_reserve_mem_size(_state);
             _root->reset_reserve_mem_size(_state);
 
             auto workload_group = _state->get_query_ctx()->workload_group();
@@ -414,14 +419,14 @@ Status PipelineTask::execute(bool* eos) {
                     COUNTER_UPDATE(_memory_reserve_failed_times, 1);
                     LOG(INFO) << "query: " << print_id(query_id) << ", try to 
reserve: "
                               << PrettyPrinter::print(reserve_size, 
TUnit::BYTES)
-                              << "(sink reserve size:("
-                              << PrettyPrinter::print(sink_reserve_size, 
TUnit::BYTES)
-                              << "), sink name: " << _sink->get_name()
-                              << ", node id: " << _sink->node_id() << " 
failed: " << st.to_string()
+                              << ", sink name: " << _sink->get_name()
+                              << ", node id: " << _sink->node_id()
+                              << ", task id: " << _state->task_id()
+                              << ", failed: " << st.to_string()
                               << ", debug info: " << 
GlobalMemoryArbitrator::process_mem_log_str();
 
                     _state->get_query_ctx()->update_paused_reason(st);
-                    _state->get_query_ctx()->set_low_memory_mode();
+                    // _state->get_query_ctx()->set_low_memory_mode();
                     bool is_high_wartermark = false;
                     bool is_low_wartermark = false;
                     workload_group->check_mem_used(&is_low_wartermark, 
&is_high_wartermark);
@@ -440,6 +445,28 @@ Status PipelineTask::execute(bool* eos) {
         if (_block->rows() != 0 || *eos) {
             SCOPED_TIMER(_sink_timer);
             Status status = Status::OK();
+            DEFER_RELEASE_RESERVED();
+            COUNTER_UPDATE(_memory_reserve_times, 1);
+            size_t sink_reserve_size = _sink->get_reserve_mem_size(_state, 
*eos);
+            status = thread_context()->try_reserve_memory(sink_reserve_size);
+            if (!status.ok()) {
+                LOG(INFO) << "query: " << print_id(query_id) << ", try to 
reserve: "
+                          << PrettyPrinter::print(sink_reserve_size, 
TUnit::BYTES)
+                          << ", sink name: " << _sink->get_name()
+                          << ", node id: " << _sink->node_id() << ", task id: 
" << _state->task_id()
+                          << ", failed: " << status.to_string()
+                          << ", debug info: " << 
GlobalMemoryArbitrator::process_mem_log_str();
+                _state->get_query_ctx()->update_paused_reason(status);
+                _memory_sufficient_dependency->block();
+                ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+                        _state->get_query_ctx()->shared_from_this(), 
sink_reserve_size);
+                _pending_block = std::move(_block);
+                _block = vectorized::Block::create_unique();
+                _eos = *eos;
+                *eos = false;
+                continue;
+            }
+
             // Define a lambda function to catch sink exception, because sink 
will check
             // return error status with EOF, it is special, could not return 
directly.
             auto sink_function = [&]() -> Status {
@@ -570,7 +597,7 @@ std::string PipelineTask::debug_string() {
 }
 
 size_t PipelineTask::get_revocable_size() const {
-    if (_running || _eos) {
+    if (_running || (_eos && !_pending_block)) {
         return 0;
     }
 
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 448b2ec5f6f..44dfdd7832a 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -261,7 +261,9 @@ private:
     RuntimeState* _state = nullptr;
     int _previous_schedule_id = -1;
     uint32_t _schedule_time = 0;
-    std::unique_ptr<doris::vectorized::Block> _block;
+    std::unique_ptr<vectorized::Block> _block;
+    std::unique_ptr<vectorized::Block> _pending_block;
+
     PipelineFragmentContext* _fragment_context = nullptr;
     TaskQueue* _task_queue = nullptr;
 
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 2ea548fd6f5..61d961b6c0e 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -472,7 +472,7 @@ Status QueryContext::revoke_memory() {
     // Do not use memlimit, use current memory usage.
     // For example, if current limit is 1.6G, but current used is 1G, if 
reserve failed
     // should free 200MB memory, not 300MB
-    //const int64_t target_revoking_size = 
(int64_t)(query_mem_tracker->consumption() * 0.2);
+    const auto target_revoking_size = 
(int64_t)(query_mem_tracker->consumption() * 0.2);
     size_t revoked_size = 0;
 
     std::vector<pipeline::PipelineTask*> chosen_tasks;
@@ -481,10 +481,10 @@ Status QueryContext::revoke_memory() {
 
         revoked_size += revocable_size;
         // Only revoke the largest task to ensure memory is used as much as 
possible
-        break;
-        //if (revoked_size >= target_revoking_size) {
-        //    break;
-        //}
+        // break;
+        if (revoked_size >= target_revoking_size) {
+            break;
+        }
     }
 
     std::weak_ptr<QueryContext> this_ctx = shared_from_this();
diff --git a/be/src/vec/common/hash_table/hash_map_context.h 
b/be/src/vec/common/hash_table/hash_map_context.h
index 2d0b46150b1..bd5bab0be97 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -73,6 +73,10 @@ struct MethodBaseInner {
                                       const uint8_t* null_map = nullptr, bool 
is_join = false,
                                       bool is_build = false, uint32_t 
bucket_size = 0) = 0;
 
+    [[nodiscard]] virtual size_t estimated_size(const ColumnRawPtrs& 
key_columns, size_t num_rows,
+                                                bool is_join = false, bool 
is_build = false,
+                                                uint32_t bucket_size = 0) = 0;
+
     virtual size_t serialized_keys_size(bool is_build) const { return 0; }
 
     void init_join_bucket_num(uint32_t num_rows, uint32_t bucket_size, const 
uint8_t* null_map) {
@@ -215,6 +219,22 @@ struct MethodSerialized : public MethodBase<TData> {
         return {begin, sum_size};
     }
 
+    size_t estimated_size(const ColumnRawPtrs& key_columns, size_t num_rows, 
bool is_join,
+                          bool is_build, uint32_t bucket_size) override {
+        size_t size = 0;
+        for (const auto& column : key_columns) {
+            size += column->byte_size();
+        }
+
+        size += sizeof(StringRef) * num_rows; // stored_keys
+        if (is_join) {
+            size += sizeof(uint32_t) * num_rows; // bucket_nums
+        } else {
+            size += sizeof(size_t) * num_rows; // hash_values
+        }
+        return size;
+    }
+
     void init_serialized_keys_impl(const ColumnRawPtrs& key_columns, size_t 
num_rows,
                                    DorisVector<StringRef>& input_keys, Arena& 
input_arena) {
         input_arena.clear();
@@ -299,6 +319,18 @@ struct MethodStringNoCache : public MethodBase<TData> {
                         : (_stored_keys.size() * sizeof(StringRef));
     }
 
+    size_t estimated_size(const ColumnRawPtrs& key_columns, size_t num_rows, 
bool is_join,
+                          bool is_build, uint32_t bucket_size) override {
+        size_t size = 0;
+        size += sizeof(StringRef) * num_rows; // stored_keys
+        if (is_join) {
+            size += sizeof(uint32_t) * num_rows; // bucket_nums
+        } else {
+            size += sizeof(size_t) * num_rows; // hash_values
+        }
+        return size;
+    }
+
     void init_serialized_keys_impl(const ColumnRawPtrs& key_columns, size_t 
num_rows,
                                    DorisVector<StringRef>& stored_keys) {
         const IColumn& column = *key_columns[0];
@@ -354,6 +386,17 @@ struct MethodOneNumber : public MethodBase<TData> {
     using State = ColumnsHashing::HashMethodOneNumber<typename Base::Value, 
typename Base::Mapped,
                                                       FieldType>;
 
+    size_t estimated_size(const ColumnRawPtrs& key_columns, size_t num_rows, 
bool is_join,
+                          bool is_build, uint32_t bucket_size) override {
+        size_t size = 0;
+        if (is_join) {
+            size += sizeof(uint32_t) * num_rows; // bucket_nums
+        } else {
+            size += sizeof(size_t) * num_rows; // hash_values
+        }
+        return size;
+    }
+
     void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
                               const uint8_t* null_map = nullptr, bool is_join 
= false,
                               bool is_build = false, uint32_t bucket_size = 0) 
override {
@@ -468,6 +511,19 @@ struct MethodKeysFixed : public MethodBase<TData> {
         return (is_build ? build_stored_keys.size() : stored_keys.size()) *
                sizeof(typename Base::Key);
     }
+
+    size_t estimated_size(const ColumnRawPtrs& key_columns, size_t num_rows, 
bool is_join,
+                          bool is_build, uint32_t bucket_size) override {
+        size_t size = 0;
+        size += sizeof(StringRef) * num_rows; // stored_keys
+        if (is_join) {
+            size += sizeof(uint32_t) * num_rows; // bucket_nums
+        } else {
+            size += sizeof(size_t) * num_rows; // hash_values
+        }
+        return size;
+    }
+
     void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
                               const uint8_t* null_map = nullptr, bool is_join 
= false,
                               bool is_build = false, uint32_t bucket_size = 0) 
override {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 6ff1fcc0aca..a83f8d485a3 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -347,7 +347,8 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* 
stream_mgr, pipeline::Exchang
     _sender_to_local_channel_dependency.resize(num_queues);
     for (size_t i = 0; i < num_queues; i++) {
         _sender_to_local_channel_dependency[i] = 
pipeline::Dependency::create_shared(
-                _dest_node_id, _dest_node_id, 
"LocalExchangeChannelDependency", true);
+                _dest_node_id, _dest_node_id, 
fmt::format("LocalExchangeChannelDependency_{}", i),
+                true);
     }
     _sender_queues.reserve(num_queues);
     int num_sender_per_queue = is_merging ? 1 : num_senders;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to