This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 7041d39da9f Spill updated (#40798) 7041d39da9f is described below commit 7041d39da9f7120b67856c753425c1089199206d Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Fri Sep 13 16:10:09 2024 +0800 Spill updated (#40798) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --- be/src/pipeline/dependency.h | 3 +- be/src/pipeline/exec/aggregation_sink_operator.cpp | 8 ++-- be/src/pipeline/exec/aggregation_sink_operator.h | 2 +- .../pipeline/exec/aggregation_source_operator.cpp | 4 +- be/src/pipeline/exec/analytic_sink_operator.cpp | 2 +- be/src/pipeline/exec/analytic_sink_operator.h | 2 +- be/src/pipeline/exec/analytic_source_operator.cpp | 2 +- be/src/pipeline/exec/assert_num_rows_operator.cpp | 2 +- be/src/pipeline/exec/datagen_operator.cpp | 2 +- be/src/pipeline/exec/hashjoin_build_sink.cpp | 9 ----- .../exec/nested_loop_join_probe_operator.cpp | 8 ++-- be/src/pipeline/exec/operator.h | 22 +---------- .../exec/partitioned_aggregation_sink_operator.h | 2 +- .../partitioned_aggregation_source_operator.cpp | 23 +++++++++-- .../exec/partitioned_hash_join_probe_operator.cpp | 3 ++ .../exec/partitioned_hash_join_sink_operator.cpp | 45 ++++++++++++---------- be/src/pipeline/exec/repeat_operator.cpp | 4 +- be/src/pipeline/exec/set_probe_sink_operator.cpp | 2 +- be/src/pipeline/exec/set_probe_sink_operator.h | 2 +- be/src/pipeline/exec/set_source_operator.cpp | 2 +- be/src/pipeline/exec/sort_source_operator.cpp | 2 +- .../exec/streaming_aggregation_operator.cpp | 4 +- be/src/pipeline/exec/union_source_operator.cpp | 2 +- be/src/pipeline/pipeline_task.cpp | 23 +++++++---- be/src/vec/core/block.cpp | 17 ++++---- 25 files changed, 102 insertions(+), 95 deletions(-) diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index f7990c097ef..547271d87fb 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -453,7 +453,8 @@ struct PartitionedAggSharedState : public BasicSharedState, std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions; size_t get_partition_index(size_t hash_value) const { - return (hash_value >> (32 - partition_count_bits)) & max_partition_index; + // return (hash_value >> (32 - partition_count_bits)) & max_partition_index; + return hash_value % partition_count; } }; diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 166187ffc6d..e1708421b0d 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -178,7 +178,7 @@ Status AggSinkLocalState::_execute_without_key(vectorized::Block* block) { DCHECK(_agg_data->without_key != nullptr); SCOPED_TIMER(_build_timer); _memory_usage_last_executing = 0; - ScopedMemTracker mem_tracker(_memory_usage_last_executing); + SCOPED_PEAK_MEM(&_memory_usage_last_executing); for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_single_add( block, @@ -191,7 +191,7 @@ Status AggSinkLocalState::_execute_without_key(vectorized::Block* block) { Status AggSinkLocalState::_merge_with_serialized_key(vectorized::Block* block) { _memory_usage_last_executing = 0; - ScopedMemTracker mem_tracker(_memory_usage_last_executing); + SCOPED_PEAK_MEM(&_memory_usage_last_executing); if (_shared_state->reach_limit) { return _merge_with_serialized_key_helper<true, false>(block); } else { @@ -401,7 +401,7 @@ Status AggSinkLocalState::_merge_without_key(vectorized::Block* block) { DCHECK(_agg_data->without_key != nullptr); _memory_usage_last_executing = 0; - ScopedMemTracker mem_tracker(_memory_usage_last_executing); + SCOPED_PEAK_MEM(&_memory_usage_last_executing); for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { if (Base::_shared_state->aggregate_evaluators[i]->is_merge()) { int col_id = AggSharedState::get_slot_column_id( @@ -440,7 +440,7 @@ void AggSinkLocalState::_update_memusage_without_key() { Status AggSinkLocalState::_execute_with_serialized_key(vectorized::Block* block) { _memory_usage_last_executing = 0; - ScopedMemTracker mem_tracker(_memory_usage_last_executing); + SCOPED_PEAK_MEM(&_memory_usage_last_executing); if (_shared_state->reach_limit) { return _execute_with_serialized_key_helper<true>(block); } else { diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index ea54281c40c..0071f7bfc03 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -126,7 +126,7 @@ protected: std::unique_ptr<ExecutorBase> _executor = nullptr; - size_t _memory_usage_last_executing = 0; + int64_t _memory_usage_last_executing = 0; }; class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> { diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index c1cb187f3e6..6df089bbb5b 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -440,7 +440,7 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - ScopedMemTracker scoped_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos)); local_state.make_nullable_output_key(block); // dispose the having clause, should not be execute in prestreaming agg @@ -482,7 +482,7 @@ void AggLocalState::make_nullable_output_key(vectorized::Block* block) { template <bool limit> Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block) { SCOPED_TIMER(_merge_timer); - ScopedMemTracker scoped_tracker(_estimate_memory_usage); + SCOPED_PEAK_MEM(&_estimate_memory_usage); size_t key_size = Base::_shared_state->probe_expr_ctxs.size(); vectorized::ColumnRawPtrs key_columns(key_size); diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 04f33d2f15b..b7623651cfc 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -269,7 +269,7 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); local_state._reserve_mem_size = 0; - ScopedMemTracker mem_tracker(local_state._reserve_mem_size); + SCOPED_PEAK_MEM(&local_state._reserve_mem_size); local_state._shared_state->input_eos = eos; if (local_state._shared_state->input_eos && input_block->rows() == 0) { diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 9d812e7cc28..93c68539144 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -62,7 +62,7 @@ private: std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs; - size_t _reserve_mem_size = 0; + int64_t _reserve_mem_size = 0; }; class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalState> { diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index caacf06b8ea..0155190f042 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -512,7 +512,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); if (local_state._shared_state->input_eos && (local_state._output_block_index == local_state._shared_state->input_blocks.size() || local_state._shared_state->input_total_rows == 0)) { diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp b/be/src/pipeline/exec/assert_num_rows_operator.cpp index 6c6a28029e2..f83569b1b34 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.cpp +++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp @@ -42,7 +42,7 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage()); + SCOPED_PEAK_MEM(&local_state.estimate_memory_usage()); local_state.add_num_rows_returned(block->rows()); int64_t num_rows_returned = local_state.num_rows_returned(); bool assert_res = false; diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index ba3ab5e42da..466f16c82fd 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -68,7 +68,7 @@ Status DataGenSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* RETURN_IF_CANCELLED(state); auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage()); + SCOPED_PEAK_MEM(&local_state.estimate_memory_usage()); Status res = local_state._table_func->get_next(state, block, eos); RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns())); local_state.reached_limit(block, eos); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 6c132649ddb..e4a9e5df72d 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -137,15 +137,6 @@ size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) { size_to_reserve += rows * sizeof(uint8_t); // JoinHashTable::visited } size_to_reserve += _evaluate_mem_usage; - - if (size_to_reserve > 2L * 1024 * 1024 * 1024) [[unlikely]] { - LOG(INFO) << "**** too big reserve size: " << size_to_reserve << ", rows: " << rows - << ", bucket_size: " << bucket_size - << ", mutable block size: " << _build_side_mutable_block.allocated_bytes() - << ", mutable block cols: " << _build_side_mutable_block.columns() - << ", _build_col_ids.size: " << _build_col_ids.size(); - } - return size_to_reserve; } diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index c5642b8a731..9e5964841ef 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -472,7 +472,7 @@ Status NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, vectorized bool eos) const { auto& local_state = get_local_state(state); COUNTER_UPDATE(local_state._probe_rows_counter, block->rows()); - ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage()); + SCOPED_PEAK_MEM(&local_state.estimate_memory_usage()); local_state._cur_probe_row_visited_flags.resize(block->rows()); std::fill(local_state._cur_probe_row_visited_flags.begin(), local_state._cur_probe_row_visited_flags.end(), 0); @@ -499,12 +499,12 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block bool* eos) const { auto& local_state = get_local_state(state); if (_is_output_left_side_only) { - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); RETURN_IF_ERROR(local_state._build_output_block(local_state._child_block.get(), block)); *eos = local_state._shared_state->left_side_eos; local_state._need_more_input_data = !local_state._shared_state->left_side_eos; } else { - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); *eos = ((_match_all_build || _is_right_semi_anti) ? local_state._output_null_idx_build_side == local_state._shared_state->build_blocks.size() && @@ -537,7 +537,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block state, join_op_variants); }; SCOPED_TIMER(local_state._loop_join_timer); - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); RETURN_IF_ERROR(std::visit( func, local_state._shared_state->join_op_variants, vectorized::make_bool_variant(_match_all_build || _is_right_semi_anti), diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 4932504b424..dca3e136526 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -199,7 +199,7 @@ public: void update_estimate_memory_usage(size_t usage) { _estimate_memory_usage += usage; } - size_t& estimate_memory_usage() { return _estimate_memory_usage; } + int64_t& estimate_memory_usage() { return _estimate_memory_usage; } void reset_estimate_memory_usage() { _estimate_memory_usage = 0; } @@ -210,7 +210,7 @@ protected: ObjectPool* _pool = nullptr; int64_t _num_rows_returned {0}; - size_t _estimate_memory_usage {0}; + int64_t _estimate_memory_usage {0}; std::unique_ptr<RuntimeProfile> _runtime_profile; @@ -243,24 +243,6 @@ protected: vectorized::Block _origin_block; }; -class ScopedMemTracker { -public: - ScopedMemTracker(size_t& counter) : _counter(counter), _mem_tracker("ScopedMemTracker") { - thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(&_mem_tracker); - _peak_usage = _mem_tracker.peak_consumption(); - } - - ~ScopedMemTracker() { - thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker(); - _counter += (_mem_tracker.peak_consumption() - _peak_usage); - } - -private: - size_t& _counter; - size_t _peak_usage = 0; - MemTracker _mem_tracker; -}; - template <typename SharedStateArg = FakeSharedState> class PipelineXLocalState : public PipelineXLocalStateBase { public: diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 4bf2e41befc..378ca3b1d20 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -348,6 +348,6 @@ private: friend class PartitionedAggSinkLocalState; std::unique_ptr<AggSinkOperatorX> _agg_sink_operator; - size_t _spill_partition_count_bits = 6; + size_t _spill_partition_count_bits = 5; }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 333f98a66cb..d83af9f6c4e 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -207,7 +207,7 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime _is_merging = true; VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " << _parent->node_id() - << " merge spilled agg data"; + << ", task id: " << _state->task_id() << " merge spilled agg data"; RETURN_IF_ERROR(Base::_shared_state->in_mem_shared_state->reset_hash_table()); _spill_dependency->Dependency::block(); @@ -218,6 +218,9 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime submit_timer.start(); auto spill_func = [this, state, query_id, submit_timer] { _spill_wait_in_queue_timer->update(submit_timer.elapsed_time()); + MonotonicStopWatch execution_timer; + execution_timer.start(); + size_t read_size = 0; Defer defer {[&]() { if (!_status.ok() || state->is_cancelled()) { if (!_status.ok()) { @@ -226,9 +229,13 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime << " merge spilled agg data error: " << _status; } _shared_state->close(); - } else if (_shared_state->spill_partitions.empty()) { + } else { VLOG_DEBUG << "query " << print_id(query_id) << " agg node " << _parent->node_id() - << " merge spilled agg data finish"; + << ", task id: " << _state->task_id() + << " merge spilled agg data finish, time used: " + << (execution_timer.elapsed_time() / (1000L * 1000 * 1000)) + << "s, read size: " << read_size << ", " + << _shared_state->spill_partitions.size() << " partitions left"; } Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once(); _is_merging = false; @@ -261,6 +268,7 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime if (!block.empty()) { has_agg_data = true; + read_size += block.bytes(); _status = parent._agg_source_operator ->merge_with_serialized_key_helper<false>( _runtime_state.get(), &block); @@ -268,6 +276,15 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime } } (void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream); + + if (!has_agg_data) { + VLOG_DEBUG << "query " << print_id(query_id) << " agg node " + << _parent->node_id() << ", task id: " << _state->task_id() + << " merge spilled agg data finish, time used: " + << execution_timer.elapsed_time() << ", empty partition " + << read_size << ", " << _shared_state->spill_partitions.size() + << " partitions left"; + } } _shared_state->spill_partitions.pop_front(); } diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index bc60d2c376a..042d7aea75e 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -809,6 +809,9 @@ Status PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) { << ", task: " << state->task_id() << ", child eos: " << local_state._child_eos; if (local_state._child_eos) { + VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() + << ", task: " << state->task_id() << ", child eos: " << local_state._child_eos + << ", will not revoke size: " << revocable_mem_size(state); return Status::OK(); } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index d166d65b4c9..14c7df63262 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -27,6 +27,7 @@ #include "runtime/fragment_mgr.h" #include "util/mem_info.h" #include "util/runtime_profile.h" +#include "vec/spill/spill_stream.h" #include "vec/spill/spill_stream_manager.h" namespace doris::pipeline { @@ -353,21 +354,20 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { } } + std::unique_lock<std::mutex> lock(_spill_lock); if (_spilling_streams_count > 0) { - std::unique_lock<std::mutex> lock(_spill_lock); - if (_spilling_streams_count > 0) { - _spill_dependency->block(); - } else if (_child_eos) { - LOG(INFO) << "hash join sink " << _parent->node_id() << " set_ready_to_read" - << ", task id: " << state->task_id(); - std::for_each(_shared_state->partitioned_build_blocks.begin(), - _shared_state->partitioned_build_blocks.end(), [&](auto& block) { - if (block) { - COUNTER_UPDATE(_in_mem_rows_counter, block->rows()); - } - }); - _dependency->set_ready_to_read(); - } + _spill_dependency->block(); + } else if (_child_eos) { + VLOG_DEBUG << "query:" << print_id(state->query_id()) << ", hash join sink " + << _parent->node_id() << " set_ready_to_read" + << ", task id: " << state->task_id(); + std::for_each(_shared_state->partitioned_build_blocks.begin(), + _shared_state->partitioned_build_blocks.end(), [&](auto& block) { + if (block) { + COUNTER_UPDATE(_in_mem_rows_counter, block->rows()); + } + }); + _dependency->set_ready_to_read(); } return Status::OK(); } @@ -438,8 +438,9 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk( std::unique_lock<std::mutex> lock(_spill_lock); _spill_dependency->set_ready(); if (_child_eos) { - LOG(INFO) << "hash join sink " << _parent->node_id() << " set_ready_to_read" - << ", task id: " << state()->task_id(); + VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) << ", hash join sink " + << _parent->node_id() << " set_ready_to_read" + << ", task id: " << state()->task_id(); std::for_each(_shared_state->partitioned_build_blocks.begin(), _shared_state->partitioned_build_blocks.end(), [&](auto& block) { if (block) { @@ -553,8 +554,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B const auto need_to_spill = local_state._shared_state->need_to_spill; if (rows == 0) { if (eos) { - LOG(INFO) << "hash join sink " << node_id() << " sink eos, set_ready_to_read" - << ", task id: " << state->task_id() << ", need spil: " << need_to_spill; + VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash join sink " + << node_id() << " sink eos, set_ready_to_read" + << ", task id: " << state->task_id() << ", need spill: " << need_to_spill; if (!need_to_spill) { if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) { @@ -596,6 +598,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0, rows)); if (eos) { return revoke_memory(state); + } else if (revocable_mem_size(state) > vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) { + return revoke_memory(state); } } else { if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) { @@ -613,8 +617,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B local_state._shared_state->inner_runtime_state.get(), in_block, eos)); if (eos) { - LOG(INFO) << "hash join sink " << node_id() << " sink eos, set_ready_to_read" - << ", task id: " << state->task_id(); + VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash join sink " + << node_id() << " sink eos, set_ready_to_read" + << ", task id: " << state->task_id(); local_state._dependency->set_ready_to_read(); } } diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index 07b8bae3fd1..e194b72a852 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -178,7 +178,7 @@ Status RepeatOperatorX::push(RuntimeState* state, vectorized::Block* input_block auto& _expr_ctxs = local_state._expr_ctxs; DCHECK(!_intermediate_block || _intermediate_block->rows() == 0); if (input_block->rows() > 0) { - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); _intermediate_block = vectorized::Block::create_unique(); for (auto& expr : _expr_ctxs) { @@ -205,7 +205,7 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp auto& _intermediate_block = local_state._intermediate_block; RETURN_IF_CANCELLED(state); - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); DCHECK(_repeat_id_idx >= 0); for (const std::vector<int64_t>& v : _grouping_list) { diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index 174d017aa53..a2ed417ed35 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -73,7 +73,7 @@ Status SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - ScopedMemTracker mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); auto probe_rows = in_block->rows(); if (probe_rows > 0) { diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index bab04206d58..05024471585 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -55,7 +55,7 @@ private: template <class HashTableContext, bool is_intersected> friend struct vectorized::HashTableProbe; - size_t _estimate_memory_usage = 0; + int64_t _estimate_memory_usage = 0; //record insert column id during probe std::vector<uint16_t> _probe_column_inserted_id; diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp index 6c3260ba850..d1c42ae3c08 100644 --- a/be/src/pipeline/exec/set_source_operator.cpp +++ b/be/src/pipeline/exec/set_source_operator.cpp @@ -74,7 +74,7 @@ Status SetSourceOperatorX<is_intersect>::get_block(RuntimeState* state, vectoriz RETURN_IF_CANCELLED(state); auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); _create_mutable_cols(local_state, block); auto st = std::visit( diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp index 47ed935ba8d..cb18019b35c 100644 --- a/be/src/pipeline/exec/sort_source_operator.cpp +++ b/be/src/pipeline/exec/sort_source_operator.cpp @@ -61,7 +61,7 @@ Status SortSourceOperatorX::open(RuntimeState* state) { Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); RETURN_IF_ERROR(local_state._shared_state->sorter->get_next(state, block, eos)); local_state.reached_limit(block, eos); diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index b573a736280..caeb32b4155 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -1279,7 +1279,7 @@ Status StreamingAggLocalState::close(RuntimeState* state) { Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block, bool* eos) const { auto& local_state = get_local_state(state); - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); if (!local_state._pre_aggregated_block->empty()) { local_state._pre_aggregated_block->swap(*block); } else { @@ -1296,7 +1296,7 @@ Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block Status StreamingAggOperatorX::push(RuntimeState* state, vectorized::Block* in_block, bool eos) const { auto& local_state = get_local_state(state); - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); local_state._input_num_rows += in_block->rows(); if (in_block->rows() > 0) { diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 24d172f6708..18058c95cae 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -141,7 +141,7 @@ Status UnionSourceOperatorX::get_next_const(RuntimeState* state, vectorized::Blo auto& local_state = state->get_local_state(operator_id())->cast<UnionSourceLocalState>(); DCHECK_LT(local_state._const_expr_list_idx, _const_expr_lists.size()); - ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage); + SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); auto& _const_expr_list_idx = local_state._const_expr_list_idx; vectorized::MutableBlock mblock = diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 6c5e018d3ec..d6b5b7504b2 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -396,15 +396,22 @@ Status PipelineTask::execute(bool* eos) { bool is_high_wartermark = false; bool is_low_wartermark = false; workload_group->check_mem_used(&is_low_wartermark, &is_high_wartermark); - if (is_low_wartermark || is_high_wartermark) { - /// The larger reserved memory size is likely due to a larger available revocable size. - /// If the available memory for revoking is large enough, here trigger revoking proactively. - if (_sink->revocable_mem_size(_state) > 512L * 1024 * 1024) { - LOG(INFO) << "query: " << print_id(query_id) - << " has big memory to revoke."; - RETURN_IF_ERROR(_sink->revoke_memory(_state)); - } + /// The larger reserved memory size is likely due to a larger available revocable size. + /// If the available memory for revoking is large enough, here trigger revoking proactively. + bool need_to_pause = false; + const auto revocable_mem_size = _sink->revocable_mem_size(_state); + if (revocable_mem_size > 1024L * 1024 * 1024) { + LOG(INFO) << "query: " << print_id(query_id) + << ", task id: " << _state->task_id() + << " has big memory to revoke: " << revocable_mem_size; + RETURN_IF_ERROR(_sink->revoke_memory(_state)); + need_to_pause = true; + } else { + need_to_pause = is_low_wartermark || is_high_wartermark; + } + + if (need_to_pause) { _memory_sufficient_dependency->block(); ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( _state->get_query_ctx()->shared_from_this()); diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 8bcce65f229..b84db1c5500 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -90,7 +90,7 @@ Status Block::deserialize(const PBlock& pblock) { RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version)); const char* buf = nullptr; - std::string compression_scratch; + faststring compression_scratch; if (pblock.compressed()) { // Decompress SCOPED_RAW_TIMER(&_decompress_time_ns); @@ -113,11 +113,11 @@ Status Block::deserialize(const PBlock& pblock) { DCHECK(success) << "snappy::GetUncompressedLength failed"; compression_scratch.resize(uncompressed_size); success = snappy::RawUncompress(compressed_data, compressed_size, - compression_scratch.data()); + reinterpret_cast<char*>(compression_scratch.data())); DCHECK(success) << "snappy::RawUncompress failed"; } _decompressed_bytes = uncompressed_size; - buf = compression_scratch.data(); + buf = reinterpret_cast<char*>(compression_scratch.data()); } else { buf = pblock.column_values().data(); } @@ -927,7 +927,7 @@ Status Block::serialize(int be_exec_version, PBlock* pblock, // serialize data values // when data type is HLL, content_uncompressed_size maybe larger than real size. - std::string column_values; + faststring column_values; try { // TODO: After support c++23, we should use resize_and_overwrite to replace resize column_values.resize(content_uncompressed_size); @@ -937,13 +937,14 @@ Status Block::serialize(int be_exec_version, PBlock* pblock, LOG(WARNING) << msg; return Status::BufferAllocFailed(msg); } - char* buf = column_values.data(); + char* buf = reinterpret_cast<char*>(column_values.data()); for (const auto& c : *this) { buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version()); } *uncompressed_bytes = content_uncompressed_size; - const size_t serialize_bytes = buf - column_values.data() + STREAMVBYTE_PADDING; + const size_t serialize_bytes = + buf - reinterpret_cast<char*>(column_values.data()) + STREAMVBYTE_PADDING; *compressed_bytes = serialize_bytes; column_values.resize(serialize_bytes); @@ -966,13 +967,13 @@ Status Block::serialize(int be_exec_version, PBlock* pblock, pblock->set_compressed(true); *compressed_bytes = compressed_size; } else { - pblock->set_column_values(std::move(column_values)); + pblock->set_column_values(column_values.data(), column_values.size()); } VLOG_ROW << "uncompressed size: " << content_uncompressed_size << ", compressed size: " << compressed_size; } else { - pblock->set_column_values(std::move(column_values)); + pblock->set_column_values(column_values.data(), column_values.size()); } if (!allow_transfer_large_data && *compressed_bytes >= std::numeric_limits<int32_t>::max()) { return Status::InternalError("The block is large than 2GB({}), can not send by Protobuf.", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org