This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 7041d39da9f Spill updated (#40798)
7041d39da9f is described below

commit 7041d39da9f7120b67856c753425c1089199206d
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Fri Sep 13 16:10:09 2024 +0800

    Spill updated (#40798)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/pipeline/dependency.h                       |  3 +-
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  8 ++--
 be/src/pipeline/exec/aggregation_sink_operator.h   |  2 +-
 .../pipeline/exec/aggregation_source_operator.cpp  |  4 +-
 be/src/pipeline/exec/analytic_sink_operator.cpp    |  2 +-
 be/src/pipeline/exec/analytic_sink_operator.h      |  2 +-
 be/src/pipeline/exec/analytic_source_operator.cpp  |  2 +-
 be/src/pipeline/exec/assert_num_rows_operator.cpp  |  2 +-
 be/src/pipeline/exec/datagen_operator.cpp          |  2 +-
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  9 -----
 .../exec/nested_loop_join_probe_operator.cpp       |  8 ++--
 be/src/pipeline/exec/operator.h                    | 22 +----------
 .../exec/partitioned_aggregation_sink_operator.h   |  2 +-
 .../partitioned_aggregation_source_operator.cpp    | 23 +++++++++--
 .../exec/partitioned_hash_join_probe_operator.cpp  |  3 ++
 .../exec/partitioned_hash_join_sink_operator.cpp   | 45 ++++++++++++----------
 be/src/pipeline/exec/repeat_operator.cpp           |  4 +-
 be/src/pipeline/exec/set_probe_sink_operator.cpp   |  2 +-
 be/src/pipeline/exec/set_probe_sink_operator.h     |  2 +-
 be/src/pipeline/exec/set_source_operator.cpp       |  2 +-
 be/src/pipeline/exec/sort_source_operator.cpp      |  2 +-
 .../exec/streaming_aggregation_operator.cpp        |  4 +-
 be/src/pipeline/exec/union_source_operator.cpp     |  2 +-
 be/src/pipeline/pipeline_task.cpp                  | 23 +++++++----
 be/src/vec/core/block.cpp                          | 17 ++++----
 25 files changed, 102 insertions(+), 95 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index f7990c097ef..547271d87fb 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -453,7 +453,8 @@ struct PartitionedAggSharedState : public BasicSharedState,
     std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions;
 
     size_t get_partition_index(size_t hash_value) const {
-        return (hash_value >> (32 - partition_count_bits)) & 
max_partition_index;
+        // return (hash_value >> (32 - partition_count_bits)) & 
max_partition_index;
+        return hash_value % partition_count;
     }
 };
 
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 166187ffc6d..e1708421b0d 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -178,7 +178,7 @@ Status 
AggSinkLocalState::_execute_without_key(vectorized::Block* block) {
     DCHECK(_agg_data->without_key != nullptr);
     SCOPED_TIMER(_build_timer);
     _memory_usage_last_executing = 0;
-    ScopedMemTracker mem_tracker(_memory_usage_last_executing);
+    SCOPED_PEAK_MEM(&_memory_usage_last_executing);
     for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) 
{
         
RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_single_add(
                 block,
@@ -191,7 +191,7 @@ Status 
AggSinkLocalState::_execute_without_key(vectorized::Block* block) {
 
 Status AggSinkLocalState::_merge_with_serialized_key(vectorized::Block* block) 
{
     _memory_usage_last_executing = 0;
-    ScopedMemTracker mem_tracker(_memory_usage_last_executing);
+    SCOPED_PEAK_MEM(&_memory_usage_last_executing);
     if (_shared_state->reach_limit) {
         return _merge_with_serialized_key_helper<true, false>(block);
     } else {
@@ -401,7 +401,7 @@ Status 
AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
     DCHECK(_agg_data->without_key != nullptr);
 
     _memory_usage_last_executing = 0;
-    ScopedMemTracker mem_tracker(_memory_usage_last_executing);
+    SCOPED_PEAK_MEM(&_memory_usage_last_executing);
     for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) 
{
         if (Base::_shared_state->aggregate_evaluators[i]->is_merge()) {
             int col_id = AggSharedState::get_slot_column_id(
@@ -440,7 +440,7 @@ void AggSinkLocalState::_update_memusage_without_key() {
 
 Status AggSinkLocalState::_execute_with_serialized_key(vectorized::Block* 
block) {
     _memory_usage_last_executing = 0;
-    ScopedMemTracker mem_tracker(_memory_usage_last_executing);
+    SCOPED_PEAK_MEM(&_memory_usage_last_executing);
     if (_shared_state->reach_limit) {
         return _execute_with_serialized_key_helper<true>(block);
     } else {
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index ea54281c40c..0071f7bfc03 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -126,7 +126,7 @@ protected:
 
     std::unique_ptr<ExecutorBase> _executor = nullptr;
 
-    size_t _memory_usage_last_executing = 0;
+    int64_t _memory_usage_last_executing = 0;
 };
 
 class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index c1cb187f3e6..6df089bbb5b 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -440,7 +440,7 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, 
const TPlanNode& tnode,
 Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    ScopedMemTracker scoped_tracker(local_state._estimate_memory_usage);
+    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
     RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos));
     local_state.make_nullable_output_key(block);
     // dispose the having clause, should not be execute in prestreaming agg
@@ -482,7 +482,7 @@ void 
AggLocalState::make_nullable_output_key(vectorized::Block* block) {
 template <bool limit>
 Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block* 
block) {
     SCOPED_TIMER(_merge_timer);
-    ScopedMemTracker scoped_tracker(_estimate_memory_usage);
+    SCOPED_PEAK_MEM(&_estimate_memory_usage);
 
     size_t key_size = Base::_shared_state->probe_expr_ctxs.size();
     vectorized::ColumnRawPtrs key_columns(key_size);
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 04f33d2f15b..b7623651cfc 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -269,7 +269,7 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Block
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
 
     local_state._reserve_mem_size = 0;
-    ScopedMemTracker mem_tracker(local_state._reserve_mem_size);
+    SCOPED_PEAK_MEM(&local_state._reserve_mem_size);
 
     local_state._shared_state->input_eos = eos;
     if (local_state._shared_state->input_eos && input_block->rows() == 0) {
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index 9d812e7cc28..93c68539144 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -62,7 +62,7 @@ private:
 
     std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
 
-    size_t _reserve_mem_size = 0;
+    int64_t _reserve_mem_size = 0;
 };
 
 class AnalyticSinkOperatorX final : public 
DataSinkOperatorX<AnalyticSinkLocalState> {
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp 
b/be/src/pipeline/exec/analytic_source_operator.cpp
index caacf06b8ea..0155190f042 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -512,7 +512,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* 
state, vectorized::Block
                                           bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
     if (local_state._shared_state->input_eos &&
         (local_state._output_block_index == 
local_state._shared_state->input_blocks.size() ||
          local_state._shared_state->input_total_rows == 0)) {
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp 
b/be/src/pipeline/exec/assert_num_rows_operator.cpp
index 6c6a28029e2..f83569b1b34 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.cpp
+++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp
@@ -42,7 +42,7 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
                                     bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage());
+    SCOPED_PEAK_MEM(&local_state.estimate_memory_usage());
     local_state.add_num_rows_returned(block->rows());
     int64_t num_rows_returned = local_state.num_rows_returned();
     bool assert_res = false;
diff --git a/be/src/pipeline/exec/datagen_operator.cpp 
b/be/src/pipeline/exec/datagen_operator.cpp
index ba3ab5e42da..466f16c82fd 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -68,7 +68,7 @@ Status DataGenSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block*
     RETURN_IF_CANCELLED(state);
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage());
+    SCOPED_PEAK_MEM(&local_state.estimate_memory_usage());
     Status res = local_state._table_func->get_next(state, block, eos);
     RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, 
block->columns()));
     local_state.reached_limit(block, eos);
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 6c132649ddb..e4a9e5df72d 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -137,15 +137,6 @@ size_t 
HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) {
         size_to_reserve += rows * sizeof(uint8_t); // JoinHashTable::visited
     }
     size_to_reserve += _evaluate_mem_usage;
-
-    if (size_to_reserve > 2L * 1024 * 1024 * 1024) [[unlikely]] {
-        LOG(INFO) << "**** too big reserve size: " << size_to_reserve << ", 
rows: " << rows
-                  << ", bucket_size: " << bucket_size
-                  << ", mutable block size: " << 
_build_side_mutable_block.allocated_bytes()
-                  << ", mutable block cols: " << 
_build_side_mutable_block.columns()
-                  << ", _build_col_ids.size: " << _build_col_ids.size();
-    }
-
     return size_to_reserve;
 }
 
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index c5642b8a731..9e5964841ef 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -472,7 +472,7 @@ Status 
NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, vectorized
                                           bool eos) const {
     auto& local_state = get_local_state(state);
     COUNTER_UPDATE(local_state._probe_rows_counter, block->rows());
-    ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage());
+    SCOPED_PEAK_MEM(&local_state.estimate_memory_usage());
     local_state._cur_probe_row_visited_flags.resize(block->rows());
     std::fill(local_state._cur_probe_row_visited_flags.begin(),
               local_state._cur_probe_row_visited_flags.end(), 0);
@@ -499,12 +499,12 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* 
state, vectorized::Block
                                           bool* eos) const {
     auto& local_state = get_local_state(state);
     if (_is_output_left_side_only) {
-        ScopedMemTracker 
scoped_mem_tracker(local_state._estimate_memory_usage);
+        SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
         
RETURN_IF_ERROR(local_state._build_output_block(local_state._child_block.get(), 
block));
         *eos = local_state._shared_state->left_side_eos;
         local_state._need_more_input_data = 
!local_state._shared_state->left_side_eos;
     } else {
-        ScopedMemTracker 
scoped_mem_tracker(local_state._estimate_memory_usage);
+        SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
         *eos = ((_match_all_build || _is_right_semi_anti)
                         ? local_state._output_null_idx_build_side ==
                                           
local_state._shared_state->build_blocks.size() &&
@@ -537,7 +537,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* 
state, vectorized::Block
                                 state, join_op_variants);
             };
             SCOPED_TIMER(local_state._loop_join_timer);
-            ScopedMemTracker 
scoped_mem_tracker(local_state._estimate_memory_usage);
+            SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
             RETURN_IF_ERROR(std::visit(
                     func, local_state._shared_state->join_op_variants,
                     vectorized::make_bool_variant(_match_all_build || 
_is_right_semi_anti),
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 4932504b424..dca3e136526 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -199,7 +199,7 @@ public:
 
     void update_estimate_memory_usage(size_t usage) { _estimate_memory_usage 
+= usage; }
 
-    size_t& estimate_memory_usage() { return _estimate_memory_usage; }
+    int64_t& estimate_memory_usage() { return _estimate_memory_usage; }
 
     void reset_estimate_memory_usage() { _estimate_memory_usage = 0; }
 
@@ -210,7 +210,7 @@ protected:
 
     ObjectPool* _pool = nullptr;
     int64_t _num_rows_returned {0};
-    size_t _estimate_memory_usage {0};
+    int64_t _estimate_memory_usage {0};
 
     std::unique_ptr<RuntimeProfile> _runtime_profile;
 
@@ -243,24 +243,6 @@ protected:
     vectorized::Block _origin_block;
 };
 
-class ScopedMemTracker {
-public:
-    ScopedMemTracker(size_t& counter) : _counter(counter), 
_mem_tracker("ScopedMemTracker") {
-        
thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(&_mem_tracker);
-        _peak_usage = _mem_tracker.peak_consumption();
-    }
-
-    ~ScopedMemTracker() {
-        thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
-        _counter += (_mem_tracker.peak_consumption() - _peak_usage);
-    }
-
-private:
-    size_t& _counter;
-    size_t _peak_usage = 0;
-    MemTracker _mem_tracker;
-};
-
 template <typename SharedStateArg = FakeSharedState>
 class PipelineXLocalState : public PipelineXLocalStateBase {
 public:
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 4bf2e41befc..378ca3b1d20 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -348,6 +348,6 @@ private:
     friend class PartitionedAggSinkLocalState;
     std::unique_ptr<AggSinkOperatorX> _agg_sink_operator;
 
-    size_t _spill_partition_count_bits = 6;
+    size_t _spill_partition_count_bits = 5;
 };
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 333f98a66cb..d83af9f6c4e 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -207,7 +207,7 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
 
     _is_merging = true;
     VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " << 
_parent->node_id()
-               << " merge spilled agg data";
+               << ", task id: " << _state->task_id() << " merge spilled agg 
data";
 
     
RETURN_IF_ERROR(Base::_shared_state->in_mem_shared_state->reset_hash_table());
     _spill_dependency->Dependency::block();
@@ -218,6 +218,9 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
     submit_timer.start();
     auto spill_func = [this, state, query_id, submit_timer] {
         _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+        MonotonicStopWatch execution_timer;
+        execution_timer.start();
+        size_t read_size = 0;
         Defer defer {[&]() {
             if (!_status.ok() || state->is_cancelled()) {
                 if (!_status.ok()) {
@@ -226,9 +229,13 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
                                  << " merge spilled agg data error: " << 
_status;
                 }
                 _shared_state->close();
-            } else if (_shared_state->spill_partitions.empty()) {
+            } else {
                 VLOG_DEBUG << "query " << print_id(query_id) << " agg node " 
<< _parent->node_id()
-                           << " merge spilled agg data finish";
+                           << ", task id: " << _state->task_id()
+                           << " merge spilled agg data finish, time used: "
+                           << (execution_timer.elapsed_time() / (1000L * 1000 
* 1000))
+                           << "s, read size: " << read_size << ", "
+                           << _shared_state->spill_partitions.size() << " 
partitions left";
             }
             
Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();
             _is_merging = false;
@@ -261,6 +268,7 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
 
                     if (!block.empty()) {
                         has_agg_data = true;
+                        read_size += block.bytes();
                         _status = parent._agg_source_operator
                                           
->merge_with_serialized_key_helper<false>(
                                                   _runtime_state.get(), 
&block);
@@ -268,6 +276,15 @@ Status 
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
                     }
                 }
                 
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+
+                if (!has_agg_data) {
+                    VLOG_DEBUG << "query " << print_id(query_id) << " agg node 
"
+                               << _parent->node_id() << ", task id: " << 
_state->task_id()
+                               << " merge spilled agg data finish, time used: "
+                               << execution_timer.elapsed_time() << ", empty 
partition "
+                               << read_size << ", " << 
_shared_state->spill_partitions.size()
+                               << " partitions left";
+                }
             }
             _shared_state->spill_partitions.pop_front();
         }
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index bc60d2c376a..042d7aea75e 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -809,6 +809,9 @@ Status 
PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) {
                << ", task: " << state->task_id() << ", child eos: " << 
local_state._child_eos;
 
     if (local_state._child_eos) {
+        VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash 
probe node: " << node_id()
+                   << ", task: " << state->task_id() << ", child eos: " << 
local_state._child_eos
+                   << ", will not revoke size: " << revocable_mem_size(state);
         return Status::OK();
     }
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index d166d65b4c9..14c7df63262 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -27,6 +27,7 @@
 #include "runtime/fragment_mgr.h"
 #include "util/mem_info.h"
 #include "util/runtime_profile.h"
+#include "vec/spill/spill_stream.h"
 #include "vec/spill/spill_stream_manager.h"
 
 namespace doris::pipeline {
@@ -353,21 +354,20 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
         }
     }
 
+    std::unique_lock<std::mutex> lock(_spill_lock);
     if (_spilling_streams_count > 0) {
-        std::unique_lock<std::mutex> lock(_spill_lock);
-        if (_spilling_streams_count > 0) {
-            _spill_dependency->block();
-        } else if (_child_eos) {
-            LOG(INFO) << "hash join sink " << _parent->node_id() << " 
set_ready_to_read"
-                      << ", task id: " << state->task_id();
-            std::for_each(_shared_state->partitioned_build_blocks.begin(),
-                          _shared_state->partitioned_build_blocks.end(), 
[&](auto& block) {
-                              if (block) {
-                                  COUNTER_UPDATE(_in_mem_rows_counter, 
block->rows());
-                              }
-                          });
-            _dependency->set_ready_to_read();
-        }
+        _spill_dependency->block();
+    } else if (_child_eos) {
+        VLOG_DEBUG << "query:" << print_id(state->query_id()) << ", hash join 
sink "
+                   << _parent->node_id() << " set_ready_to_read"
+                   << ", task id: " << state->task_id();
+        std::for_each(_shared_state->partitioned_build_blocks.begin(),
+                      _shared_state->partitioned_build_blocks.end(), [&](auto& 
block) {
+                          if (block) {
+                              COUNTER_UPDATE(_in_mem_rows_counter, 
block->rows());
+                          }
+                      });
+        _dependency->set_ready_to_read();
     }
     return Status::OK();
 }
@@ -438,8 +438,9 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
         std::unique_lock<std::mutex> lock(_spill_lock);
         _spill_dependency->set_ready();
         if (_child_eos) {
-            LOG(INFO) << "hash join sink " << _parent->node_id() << " 
set_ready_to_read"
-                      << ", task id: " << state()->task_id();
+            VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) << 
", hash join sink "
+                       << _parent->node_id() << " set_ready_to_read"
+                       << ", task id: " << state()->task_id();
             std::for_each(_shared_state->partitioned_build_blocks.begin(),
                           _shared_state->partitioned_build_blocks.end(), 
[&](auto& block) {
                               if (block) {
@@ -553,8 +554,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
     const auto need_to_spill = local_state._shared_state->need_to_spill;
     if (rows == 0) {
         if (eos) {
-            LOG(INFO) << "hash join sink " << node_id() << " sink eos, 
set_ready_to_read"
-                      << ", task id: " << state->task_id() << ", need spil: " 
<< need_to_spill;
+            VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash 
join sink "
+                       << node_id() << " sink eos, set_ready_to_read"
+                       << ", task id: " << state->task_id() << ", need spill: 
" << need_to_spill;
 
             if (!need_to_spill) {
                 if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) 
{
@@ -596,6 +598,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
         RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0, 
rows));
         if (eos) {
             return revoke_memory(state);
+        } else if (revocable_mem_size(state) > 
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
+            return revoke_memory(state);
         }
     } else {
         if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) {
@@ -613,8 +617,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
                 local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
 
         if (eos) {
-            LOG(INFO) << "hash join sink " << node_id() << " sink eos, 
set_ready_to_read"
-                      << ", task id: " << state->task_id();
+            VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash 
join sink "
+                       << node_id() << " sink eos, set_ready_to_read"
+                       << ", task id: " << state->task_id();
             local_state._dependency->set_ready_to_read();
         }
     }
diff --git a/be/src/pipeline/exec/repeat_operator.cpp 
b/be/src/pipeline/exec/repeat_operator.cpp
index 07b8bae3fd1..e194b72a852 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -178,7 +178,7 @@ Status RepeatOperatorX::push(RuntimeState* state, 
vectorized::Block* input_block
     auto& _expr_ctxs = local_state._expr_ctxs;
     DCHECK(!_intermediate_block || _intermediate_block->rows() == 0);
     if (input_block->rows() > 0) {
-        ScopedMemTracker 
scoped_mem_tracker(local_state._estimate_memory_usage);
+        SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
         _intermediate_block = vectorized::Block::create_unique();
 
         for (auto& expr : _expr_ctxs) {
@@ -205,7 +205,7 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, 
vectorized::Block* outp
     auto& _intermediate_block = local_state._intermediate_block;
     RETURN_IF_CANCELLED(state);
 
-    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
 
     DCHECK(_repeat_id_idx >= 0);
     for (const std::vector<int64_t>& v : _grouping_list) {
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp 
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 174d017aa53..a2ed417ed35 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -73,7 +73,7 @@ Status 
SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
-    ScopedMemTracker mem_tracker(local_state._estimate_memory_usage);
+    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
 
     auto probe_rows = in_block->rows();
     if (probe_rows > 0) {
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index bab04206d58..05024471585 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -55,7 +55,7 @@ private:
     template <class HashTableContext, bool is_intersected>
     friend struct vectorized::HashTableProbe;
 
-    size_t _estimate_memory_usage = 0;
+    int64_t _estimate_memory_usage = 0;
 
     //record insert column id during probe
     std::vector<uint16_t> _probe_column_inserted_id;
diff --git a/be/src/pipeline/exec/set_source_operator.cpp 
b/be/src/pipeline/exec/set_source_operator.cpp
index 6c3260ba850..d1c42ae3c08 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -74,7 +74,7 @@ Status 
SetSourceOperatorX<is_intersect>::get_block(RuntimeState* state, vectoriz
     RETURN_IF_CANCELLED(state);
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
 
     _create_mutable_cols(local_state, block);
     auto st = std::visit(
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp 
b/be/src/pipeline/exec/sort_source_operator.cpp
index 47ed935ba8d..cb18019b35c 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -61,7 +61,7 @@ Status SortSourceOperatorX::open(RuntimeState* state) {
 Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
 
     RETURN_IF_ERROR(local_state._shared_state->sorter->get_next(state, block, 
eos));
     local_state.reached_limit(block, eos);
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index b573a736280..caeb32b4155 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -1279,7 +1279,7 @@ Status StreamingAggLocalState::close(RuntimeState* state) 
{
 
 Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* 
block, bool* eos) const {
     auto& local_state = get_local_state(state);
-    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
     if (!local_state._pre_aggregated_block->empty()) {
         local_state._pre_aggregated_block->swap(*block);
     } else {
@@ -1296,7 +1296,7 @@ Status StreamingAggOperatorX::pull(RuntimeState* state, 
vectorized::Block* block
 Status StreamingAggOperatorX::push(RuntimeState* state, vectorized::Block* 
in_block,
                                    bool eos) const {
     auto& local_state = get_local_state(state);
-    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
 
     local_state._input_num_rows += in_block->rows();
     if (in_block->rows() > 0) {
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index 24d172f6708..18058c95cae 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -141,7 +141,7 @@ Status UnionSourceOperatorX::get_next_const(RuntimeState* 
state, vectorized::Blo
     auto& local_state = 
state->get_local_state(operator_id())->cast<UnionSourceLocalState>();
     DCHECK_LT(local_state._const_expr_list_idx, _const_expr_lists.size());
 
-    ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
 
     auto& _const_expr_list_idx = local_state._const_expr_list_idx;
     vectorized::MutableBlock mblock =
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 6c5e018d3ec..d6b5b7504b2 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -396,15 +396,22 @@ Status PipelineTask::execute(bool* eos) {
                     bool is_high_wartermark = false;
                     bool is_low_wartermark = false;
                     workload_group->check_mem_used(&is_low_wartermark, 
&is_high_wartermark);
-                    if (is_low_wartermark || is_high_wartermark) {
-                        /// The larger reserved memory size is likely due to a 
larger available revocable size.
-                        /// If the available memory for revoking is large 
enough, here trigger revoking proactively.
-                        if (_sink->revocable_mem_size(_state) > 512L * 1024 * 
1024) {
-                            LOG(INFO) << "query: " << print_id(query_id)
-                                      << " has big memory to revoke.";
-                            RETURN_IF_ERROR(_sink->revoke_memory(_state));
-                        }
 
+                    /// The larger reserved memory size is likely due to a 
larger available revocable size.
+                    /// If the available memory for revoking is large enough, 
here trigger revoking proactively.
+                    bool need_to_pause = false;
+                    const auto revocable_mem_size = 
_sink->revocable_mem_size(_state);
+                    if (revocable_mem_size > 1024L * 1024 * 1024) {
+                        LOG(INFO) << "query: " << print_id(query_id)
+                                  << ", task id: " << _state->task_id()
+                                  << " has big memory to revoke: " << 
revocable_mem_size;
+                        RETURN_IF_ERROR(_sink->revoke_memory(_state));
+                        need_to_pause = true;
+                    } else {
+                        need_to_pause = is_low_wartermark || 
is_high_wartermark;
+                    }
+
+                    if (need_to_pause) {
                         _memory_sufficient_dependency->block();
                         
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
                                 _state->get_query_ctx()->shared_from_this());
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 8bcce65f229..b84db1c5500 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -90,7 +90,7 @@ Status Block::deserialize(const PBlock& pblock) {
     
RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version));
 
     const char* buf = nullptr;
-    std::string compression_scratch;
+    faststring compression_scratch;
     if (pblock.compressed()) {
         // Decompress
         SCOPED_RAW_TIMER(&_decompress_time_ns);
@@ -113,11 +113,11 @@ Status Block::deserialize(const PBlock& pblock) {
             DCHECK(success) << "snappy::GetUncompressedLength failed";
             compression_scratch.resize(uncompressed_size);
             success = snappy::RawUncompress(compressed_data, compressed_size,
-                                            compression_scratch.data());
+                                            
reinterpret_cast<char*>(compression_scratch.data()));
             DCHECK(success) << "snappy::RawUncompress failed";
         }
         _decompressed_bytes = uncompressed_size;
-        buf = compression_scratch.data();
+        buf = reinterpret_cast<char*>(compression_scratch.data());
     } else {
         buf = pblock.column_values().data();
     }
@@ -927,7 +927,7 @@ Status Block::serialize(int be_exec_version, PBlock* pblock,
 
     // serialize data values
     // when data type is HLL, content_uncompressed_size maybe larger than real 
size.
-    std::string column_values;
+    faststring column_values;
     try {
         // TODO: After support c++23, we should use resize_and_overwrite to 
replace resize
         column_values.resize(content_uncompressed_size);
@@ -937,13 +937,14 @@ Status Block::serialize(int be_exec_version, PBlock* 
pblock,
         LOG(WARNING) << msg;
         return Status::BufferAllocFailed(msg);
     }
-    char* buf = column_values.data();
+    char* buf = reinterpret_cast<char*>(column_values.data());
 
     for (const auto& c : *this) {
         buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version());
     }
     *uncompressed_bytes = content_uncompressed_size;
-    const size_t serialize_bytes = buf - column_values.data() + 
STREAMVBYTE_PADDING;
+    const size_t serialize_bytes =
+            buf - reinterpret_cast<char*>(column_values.data()) + 
STREAMVBYTE_PADDING;
     *compressed_bytes = serialize_bytes;
     column_values.resize(serialize_bytes);
 
@@ -966,13 +967,13 @@ Status Block::serialize(int be_exec_version, PBlock* 
pblock,
             pblock->set_compressed(true);
             *compressed_bytes = compressed_size;
         } else {
-            pblock->set_column_values(std::move(column_values));
+            pblock->set_column_values(column_values.data(), 
column_values.size());
         }
 
         VLOG_ROW << "uncompressed size: " << content_uncompressed_size
                  << ", compressed size: " << compressed_size;
     } else {
-        pblock->set_column_values(std::move(column_values));
+        pblock->set_column_values(column_values.data(), column_values.size());
     }
     if (!allow_transfer_large_data && *compressed_bytes >= 
std::numeric_limits<int32_t>::max()) {
         return Status::InternalError("The block is large than 2GB({}), can not 
send by Protobuf.",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to