This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new e5fd336badf Use DorisVector in memtable; improve low memory mode (#46929) e5fd336badf is described below commit e5fd336badf5d689b24157d92cd9ca1b39f4d718 Author: TengJianPing <tengjianp...@selectdb.com> AuthorDate: Tue Jan 14 15:38:08 2025 +0800 Use DorisVector in memtable; improve low memory mode (#46929) --- be/src/cloud/cloud_delta_writer.cpp | 2 +- be/src/cloud/cloud_delta_writer.h | 2 +- be/src/cloud/cloud_tablets_channel.cpp | 2 +- be/src/olap/delta_writer.cpp | 2 +- be/src/olap/delta_writer.h | 4 +-- be/src/olap/delta_writer_v2.cpp | 2 +- be/src/olap/delta_writer_v2.h | 2 +- be/src/olap/memtable.cpp | 12 ++++---- be/src/olap/memtable.h | 10 +++--- be/src/olap/memtable_writer.cpp | 2 +- be/src/olap/memtable_writer.h | 2 +- be/src/olap/rowset/segment_creator.cpp | 4 ++- .../rowset/segment_v2/inverted_index_reader.cpp | 9 +++--- be/src/pipeline/dependency.h | 4 +-- be/src/pipeline/exec/data_queue.cpp | 14 +++++++-- be/src/pipeline/exec/data_queue.h | 11 +++++-- be/src/pipeline/exec/exchange_sink_operator.cpp | 7 ++--- be/src/pipeline/exec/exchange_sink_operator.h | 10 ++++++ be/src/pipeline/exec/olap_table_sink_v2_operator.h | 5 +++ be/src/pipeline/exec/operator.h | 2 ++ .../exec/partitioned_hash_join_probe_operator.cpp | 2 +- be/src/pipeline/exec/scan_operator.h | 8 +++-- .../exec/streaming_aggregation_operator.cpp | 14 ++++----- .../pipeline/exec/streaming_aggregation_operator.h | 7 +++-- be/src/pipeline/exec/union_sink_operator.cpp | 2 +- be/src/pipeline/exec/union_sink_operator.h | 5 +++ be/src/pipeline/exec/union_source_operator.cpp | 5 +-- be/src/pipeline/exec/union_source_operator.h | 7 +++++ .../local_exchange_sink_operator.cpp | 5 +-- .../local_exchange/local_exchange_sink_operator.h | 7 +++++ be/src/pipeline/local_exchange/local_exchanger.cpp | 9 ------ be/src/pipeline/local_exchange/local_exchanger.h | 12 ++++++-- be/src/pipeline/pipeline_task.cpp | 9 ++++-- be/src/runtime/memory/memory_profile.cpp | 12 ++------ be/src/runtime/query_context.h | 8 +++++ be/src/runtime/runtime_state.cpp | 1 - be/src/runtime/runtime_state.h | 4 --- be/src/runtime/tablets_channel.cpp | 6 ++-- be/src/runtime/tablets_channel.h | 5 +-- .../workload_group/workload_group_manager.cpp | 36 ++++++++++++++++------ be/src/vec/core/block.cpp | 10 ++++++ be/src/vec/exec/scan/scanner_context.cpp | 6 +--- be/src/vec/exec/scan/scanner_scheduler.cpp | 2 +- be/src/vec/sink/vrow_distribution.h | 6 ++-- be/src/vec/sink/writer/async_result_writer.cpp | 5 +++ be/src/vec/sink/writer/async_result_writer.h | 2 ++ be/src/vec/sink/writer/vtablet_writer_v2.h | 2 +- 47 files changed, 192 insertions(+), 113 deletions(-) diff --git a/be/src/cloud/cloud_delta_writer.cpp b/be/src/cloud/cloud_delta_writer.cpp index 7beaeb3e086..a1c5e08f2c8 100644 --- a/be/src/cloud/cloud_delta_writer.cpp +++ b/be/src/cloud/cloud_delta_writer.cpp @@ -61,7 +61,7 @@ Status CloudDeltaWriter::batch_init(std::vector<CloudDeltaWriter*> writers) { } Status CloudDeltaWriter::write(const vectorized::Block* block, - const std::vector<uint32_t>& row_idxs) { + const DorisVector<uint32_t>& row_idxs) { if (row_idxs.empty()) [[unlikely]] { return Status::OK(); } diff --git a/be/src/cloud/cloud_delta_writer.h b/be/src/cloud/cloud_delta_writer.h index 4558b04acd1..e97442d137f 100644 --- a/be/src/cloud/cloud_delta_writer.h +++ b/be/src/cloud/cloud_delta_writer.h @@ -32,7 +32,7 @@ public: const UniqueId& load_id); ~CloudDeltaWriter() override; - Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) override; + Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) override; Status close() override; diff --git a/be/src/cloud/cloud_tablets_channel.cpp b/be/src/cloud/cloud_tablets_channel.cpp index 85b8e3ea33a..63e47b69d06 100644 --- a/be/src/cloud/cloud_tablets_channel.cpp +++ b/be/src/cloud/cloud_tablets_channel.cpp @@ -55,7 +55,7 @@ Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& reques return Status::OK(); } - std::unordered_map<int64_t, std::vector<uint32_t>> tablet_to_rowidxs; + std::unordered_map<int64_t, DorisVector<uint32_t>> tablet_to_rowidxs; _build_tablet_to_rowidxs(request, &tablet_to_rowidxs); std::unordered_set<int64_t> partition_ids; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 88277775f96..6718f877af9 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -118,7 +118,7 @@ Status BaseDeltaWriter::init() { return Status::OK(); } -Status DeltaWriter::write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) { +Status DeltaWriter::write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) { if (UNLIKELY(row_idxs.empty())) { return Status::OK(); } diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 20f693d5e4d..550960f3f1e 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -64,7 +64,7 @@ public: virtual ~BaseDeltaWriter(); - virtual Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) = 0; + virtual Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) = 0; // flush the last memtable to flush queue, must call it before build_rowset() virtual Status close() = 0; @@ -123,7 +123,7 @@ public: ~DeltaWriter() override; - Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) override; + Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) override; Status close() override; diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index a6fb0154489..e298e1aea39 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -139,7 +139,7 @@ Status DeltaWriterV2::init() { return Status::OK(); } -Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) { +Status DeltaWriterV2::write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) { if (UNLIKELY(row_idxs.empty())) { return Status::OK(); } diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h index f9c2800a68f..b813ec228a0 100644 --- a/be/src/olap/delta_writer_v2.h +++ b/be/src/olap/delta_writer_v2.h @@ -71,7 +71,7 @@ public: Status init(); - Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs); + Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs); // flush the last memtable to flush queue, must call it before close_wait() Status close(); diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 62d9ecf44a9..8593f1ef482 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -182,7 +182,7 @@ int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* r } Status MemTable::insert(const vectorized::Block* input_block, - const std::vector<uint32_t>& row_idxs) { + const DorisVector<uint32_t>& row_idxs) { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( _query_thread_context.query_mem_tracker->write_tracker()); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); @@ -282,7 +282,7 @@ void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_blo } Status MemTable::_put_into_output(vectorized::Block& in_block) { SCOPED_RAW_TIMER(&_stat.put_into_output_ns); - std::vector<uint32_t> row_pos_vec; + DorisVector<uint32_t> row_pos_vec; DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); row_pos_vec.reserve(in_block.rows()); for (int i = 0; i < _row_in_blocks.size(); i++) { @@ -343,7 +343,7 @@ Status MemTable::_sort_by_cluster_keys() { auto clone_block = in_block.clone_without_columns(); _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block); - std::vector<RowInBlock*> row_in_blocks; + DorisVector<RowInBlock*> row_in_blocks; std::unique_ptr<int, std::function<void(int*)>> row_in_blocks_deleter((int*)0x01, [&](int*) { std::for_each(row_in_blocks.begin(), row_in_blocks.end(), std::default_delete<RowInBlock>()); @@ -378,7 +378,7 @@ Status MemTable::_sort_by_cluster_keys() { in_block = mutable_block.to_block(); SCOPED_RAW_TIMER(&_stat.put_into_output_ns); - std::vector<uint32_t> row_pos_vec; + DorisVector<uint32_t> row_pos_vec; DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); row_pos_vec.reserve(in_block.rows()); for (int i = 0; i < row_in_blocks.size(); i++) { @@ -392,7 +392,7 @@ Status MemTable::_sort_by_cluster_keys() { row_pos_vec.data() + in_block.rows(), &column_offset); } -void MemTable::_sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie, +void MemTable::_sort_one_column(DorisVector<RowInBlock*>& row_in_blocks, Tie& tie, std::function<int(const RowInBlock*, const RowInBlock*)> cmp) { auto iter = tie.iter(); while (iter.next()) { @@ -464,7 +464,7 @@ void MemTable::_aggregate() { vectorized::MutableBlock::build_mutable_block(&in_block); _vec_row_comparator->set_block(&mutable_block); auto& block_data = in_block.get_columns_with_type_and_name(); - std::vector<RowInBlock*> temp_row_in_blocks; + DorisVector<RowInBlock*> temp_row_in_blocks; temp_row_in_blocks.reserve(_last_sorted_pos); RowInBlock* prev_row = nullptr; int row_pos = -1; diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 09591df2745..f4b1de45272 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -183,7 +183,7 @@ public: size_t memory_usage() const { return _mem_tracker->consumption(); } size_t get_flush_reserve_memory_size() const; // insert tuple from (row_pos) to (row_pos+num_rows) - Status insert(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs); + Status insert(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs); void shrink_memtable_by_agg(); @@ -252,7 +252,7 @@ private: //return number of same keys size_t _sort(); Status _sort_by_cluster_keys(); - void _sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie, + void _sort_one_column(DorisVector<RowInBlock*>& row_in_blocks, Tie& tie, std::function<int(const RowInBlock*, const RowInBlock*)> cmp); template <bool is_final> void _finalize_one_row(RowInBlock* row, const vectorized::ColumnsWithTypeAndName& block_data, @@ -263,10 +263,10 @@ private: bool _is_first_insertion; void _init_agg_functions(const vectorized::Block* block); - std::vector<vectorized::AggregateFunctionPtr> _agg_functions; - std::vector<size_t> _offsets_of_aggregate_states; + DorisVector<vectorized::AggregateFunctionPtr> _agg_functions; + DorisVector<size_t> _offsets_of_aggregate_states; size_t _total_size_of_aggregate_states; - std::vector<RowInBlock*> _row_in_blocks; + DorisVector<RowInBlock*> _row_in_blocks; size_t _num_columns; int32_t _seq_col_idx_in_block = -1; diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index 88532646b66..6d63890eede 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -86,7 +86,7 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer, } Status MemTableWriter::write(const vectorized::Block* block, - const std::vector<uint32_t>& row_idxs) { + const DorisVector<uint32_t>& row_idxs) { if (UNLIKELY(row_idxs.empty())) { return Status::OK(); } diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h index 924c3735310..c8c72b54e3f 100644 --- a/be/src/olap/memtable_writer.h +++ b/be/src/olap/memtable_writer.h @@ -69,7 +69,7 @@ public: std::shared_ptr<PartialUpdateInfo> partial_update_info, std::shared_ptr<WorkloadGroup> wg_sptr, bool unique_key_mow = false); - Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs); + Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs); // flush the last memtable to flush queue, must call it before close_wait() Status close(); diff --git a/be/src/olap/rowset/segment_creator.cpp b/be/src/olap/rowset/segment_creator.cpp index e0eb7534123..1073e32395e 100644 --- a/be/src/olap/rowset/segment_creator.cpp +++ b/be/src/olap/rowset/segment_creator.cpp @@ -37,6 +37,7 @@ #include "olap/rowset/segment_v2/vertical_segment_writer.h" #include "olap/tablet_schema.h" #include "olap/utils.h" +#include "util/pretty_printer.h" #include "vec/columns/column.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_object.h" @@ -254,7 +255,8 @@ Status SegmentFlusher::_flush_segment_writer( segstat.key_bounds = key_bounds; LOG(INFO) << "tablet_id:" << _context.tablet_id << ", flushing rowset_dir: " << _context.tablet_path - << ", rowset_id:" << _context.rowset_id << ", data size:" << segstat.data_size + << ", rowset_id:" << _context.rowset_id + << ", data size:" << PrettyPrinter::print_bytes(segstat.data_size) << ", index size:" << segstat.index_size; writer.reset(); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp index 44c038aec9c..27dcb32dd99 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp @@ -1158,11 +1158,10 @@ Status InvertedIndexIterator::read_from_inverted_index( RETURN_IF_ERROR( try_read_from_inverted_index(column_name, query_value, query_type, &hit_count)); if (hit_count > segment_num_rows * query_bkd_limit_percent / 100) { - return Status:: - Error<ErrorCode::INVERTED_INDEX_BYPASS>( - "hit count: {}, bkd inverted reached limit {}% , segment num " - "rows:{}", // add blackspace after % to avoid log4j format bug - hit_count, query_bkd_limit_percent, segment_num_rows); + return Status::Error<ErrorCode::INVERTED_INDEX_BYPASS>( + "hit count: {}, bkd inverted reached limit {}% , segment num " + "rows:{}", // add blackspace after % to avoid log4j format bug + hit_count, query_bkd_limit_percent, segment_num_rows); } } } diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 9ce85bcb7b5..e010ad7e6cd 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -779,7 +779,7 @@ public: std::unique_ptr<ExchangerBase> exchanger {}; std::vector<RuntimeProfile::Counter*> mem_counters; std::atomic<int64_t> mem_usage = 0; - size_t _buffer_mem_limit = config::local_exchange_buffer_mem_limit; + std::atomic<size_t> _buffer_mem_limit = config::local_exchange_buffer_mem_limit; // We need to make sure to add mem_usage first and then enqueue, otherwise sub mem_usage may cause negative mem_usage during concurrent dequeue. std::mutex le_lock; virtual void create_dependencies(int local_exchange_id) { @@ -906,7 +906,7 @@ struct LocalMergeExchangeSharedState : public LocalExchangeSharedState { private: std::vector<std::atomic_int64_t> _queues_mem_usage; - int64_t _each_queue_limit; + std::atomic_int64_t _each_queue_limit; }; #include "common/compile_check_end.h" } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/data_queue.cpp b/be/src/pipeline/exec/data_queue.cpp index 85354ece76a..97a3293004f 100644 --- a/be/src/pipeline/exec/data_queue.cpp +++ b/be/src/pipeline/exec/data_queue.cpp @@ -68,8 +68,18 @@ std::unique_ptr<vectorized::Block> DataQueue::get_free_block(int child_idx) { void DataQueue::push_free_block(std::unique_ptr<vectorized::Block> block, int child_idx) { DCHECK(block->rows() == 0); - std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]); - _free_blocks[child_idx].emplace_back(std::move(block)); + if (!_is_low_memory_mode) { + std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]); + _free_blocks[child_idx].emplace_back(std::move(block)); + } +} + +void DataQueue::clear_free_blocks() { + for (size_t child_idx = 0; child_idx < _free_blocks.size(); ++child_idx) { + std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]); + std::deque<std::unique_ptr<vectorized::Block>> tmp_queue; + _free_blocks[child_idx].swap(tmp_queue); + } } //check which queue have data, and save the idx in _flag_queue_idx, diff --git a/be/src/pipeline/exec/data_queue.h b/be/src/pipeline/exec/data_queue.h index aabe0ba8797..1548c34addf 100644 --- a/be/src/pipeline/exec/data_queue.h +++ b/be/src/pipeline/exec/data_queue.h @@ -48,6 +48,8 @@ public: void push_free_block(std::unique_ptr<vectorized::Block> output_block, int child_idx = 0); + void clear_free_blocks(); + void set_finish(int child_idx = 0); void set_canceled(int child_idx = 0); // should set before finish bool is_finish(int child_idx = 0); @@ -74,7 +76,11 @@ public: void set_max_blocks_in_sub_queue(int64_t max_blocks) { _max_blocks_in_sub_queue = max_blocks; } - void set_low_memory_mode() { _max_blocks_in_sub_queue = 1; } + void set_low_memory_mode() { + _is_low_memory_mode = true; + _max_blocks_in_sub_queue = 1; + clear_free_blocks(); + } private: std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock; @@ -99,7 +105,8 @@ private: // only used by streaming agg source operator bool _data_exhausted = false; - int64_t _max_blocks_in_sub_queue = 1; + std::atomic_bool _is_low_memory_mode = false; + std::atomic_int64_t _max_blocks_in_sub_queue = 1; //this only use to record the queue[0] for profile int64_t _max_bytes_in_queue = 0; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 3ac73813021..573f4aa840e 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -356,8 +356,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } // When `local_state.only_local_exchange` the `sink_buffer` is nullptr. - if (state->get_query_ctx()->low_memory_mode() && local_state._sink_buffer != nullptr) { - local_state._sink_buffer->set_low_memory_mode(); + if (state->get_query_ctx()->low_memory_mode()) { + set_low_memory_mode(state); } if (_part_type == TPartitionType::UNPARTITIONED || local_state.channels.size() == 1) { @@ -398,9 +398,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block block_holder->reset_block(); } - if (state->get_query_ctx()->low_memory_mode()) { - local_state._broadcast_pb_mem_limiter->set_low_memory_mode(); - } local_state._broadcast_pb_mem_limiter->acquire(*block_holder); size_t idx = 0; diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 3d6eeb4b39e..d7030ae242a 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -199,6 +199,16 @@ public: DataDistribution required_data_distribution() const override; bool is_serial_operator() const override { return true; } + void set_low_memory_mode(RuntimeState* state) override { + auto& local_state = get_local_state(state); + // When `local_state.only_local_exchange` the `sink_buffer` is nullptr. + if (local_state._sink_buffer) { + local_state._sink_buffer->set_low_memory_mode(); + } + if (local_state._broadcast_pb_mem_limiter) { + local_state._broadcast_pb_mem_limiter->set_low_memory_mode(); + } + } // For a normal shuffle scenario, if the concurrency is n, // there can be up to n * n RPCs in the current fragment. diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h b/be/src/pipeline/exec/olap_table_sink_v2_operator.h index 4b55ec09efa..da5986d09ee 100644 --- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h +++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h @@ -66,6 +66,11 @@ public: return local_state.sink(state, in_block, eos); } + void set_low_memory_mode(RuntimeState* state) override { + auto& local_state = get_local_state(state); + local_state._writer->clear_free_blocks(); + } + private: friend class OlapTableSinkV2LocalState; template <typename Writer, typename Parent> diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 84327f8b5a6..7d1236b85b1 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -118,6 +118,8 @@ public: return Status::OK(); } + virtual void set_low_memory_mode(RuntimeState* state) {} + [[nodiscard]] virtual bool require_data_distribution() const { return false; } OperatorPtr child() { return _child; } [[nodiscard]] bool followed_by_shuffled_operator() const { diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 0588afcca0a..120c6bcbd06 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -870,7 +870,7 @@ bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* stat static_cast<int64_t>( vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM)); } else { - return vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM; + return revocable_size > vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM; } } return false; diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index d099ccdd12f..d6cd2806db5 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -17,8 +17,6 @@ #pragma once -#include <stdint.h> - #include <cstdint> #include <string> @@ -29,6 +27,7 @@ #include "pipeline/dependency.h" #include "runtime/descriptors.h" #include "runtime/types.h" +#include "vec/exec/scan/scanner_context.h" #include "vec/exec/scan/vscan_node.h" #include "vec/exprs/vectorized_fn_call.h" #include "vec/exprs/vin_predicate.h" @@ -388,6 +387,11 @@ public: return {ExchangeType::BUCKET_HASH_SHUFFLE}; } + void set_low_memory_mode(RuntimeState* state) override { + auto& local_state = get_local_state(state); + local_state._scanner_ctx->clear_free_blocks(); + } + int64_t get_push_down_count() const { return _push_down_count; } using OperatorX<LocalStateType>::node_id; using OperatorX<LocalStateType>::operator_id; diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index e0729db9da6..81769663425 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -497,14 +497,11 @@ Status StreamingAggLocalState::_init_hash_method(const vectorized::VExprContextS return Status::OK(); } -void StreamingAggLocalState::set_low_memory_mode() { - auto& p = Base::_parent->template cast<StreamingAggOperatorX>(); - p._spill_streaming_agg_mem_limit = 1024 * 1024; -} -Status StreamingAggLocalState::do_pre_agg(vectorized::Block* input_block, +Status StreamingAggLocalState::do_pre_agg(RuntimeState* state, vectorized::Block* input_block, vectorized::Block* output_block) { - if (state()->get_query_ctx()->low_memory_mode()) { - set_low_memory_mode(); + if (state->get_query_ctx()->low_memory_mode()) { + auto& p = Base::_parent->template cast<StreamingAggOperatorX>(); + p.set_low_memory_mode(state); } RETURN_IF_ERROR(_pre_agg_with_serialized_key(input_block, output_block)); @@ -1288,7 +1285,8 @@ Status StreamingAggOperatorX::push(RuntimeState* state, vectorized::Block* in_bl local_state._input_num_rows += in_block->rows(); if (in_block->rows() > 0) { - RETURN_IF_ERROR(local_state.do_pre_agg(in_block, local_state._pre_aggregated_block.get())); + RETURN_IF_ERROR( + local_state.do_pre_agg(state, in_block, local_state._pre_aggregated_block.get())); } in_block->clear_column_data(_child->row_desc().num_materialized_slots()); return Status::OK(); diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h b/be/src/pipeline/exec/streaming_aggregation_operator.h index 7d85d092d17..3ee52eeb6ec 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_operator.h @@ -45,9 +45,9 @@ public: Status init(RuntimeState* state, LocalStateInfo& info) override; Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; - Status do_pre_agg(vectorized::Block* input_block, vectorized::Block* output_block); + Status do_pre_agg(RuntimeState* state, vectorized::Block* input_block, + vectorized::Block* output_block); void make_nullable_output_key(vectorized::Block* block); - void set_low_memory_mode(); private: friend class StreamingAggOperatorX; @@ -209,6 +209,9 @@ public: Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) const override; Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override; bool need_more_input_data(RuntimeState* state) const override; + void set_low_memory_mode(RuntimeState* state) override { + _spill_streaming_agg_mem_limit = 1024 * 1024; + } private: friend class StreamingAggLocalState; diff --git a/be/src/pipeline/exec/union_sink_operator.cpp b/be/src/pipeline/exec/union_sink_operator.cpp index 39ca43a9d1d..7a0766e0f92 100644 --- a/be/src/pipeline/exec/union_sink_operator.cpp +++ b/be/src/pipeline/exec/union_sink_operator.cpp @@ -92,7 +92,7 @@ Status UnionSinkOperatorX::open(RuntimeState* state) { Status UnionSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, bool eos) { auto& local_state = get_local_state(state); if (state->get_query_ctx()->low_memory_mode()) { - local_state._shared_state->data_queue.set_low_memory_mode(); + set_low_memory_mode(state); } SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index 3a8880622cb..9f9cdb58433 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -95,6 +95,11 @@ public: return _followed_by_shuffled_operator; } + void set_low_memory_mode(RuntimeState* state) override { + auto& local_state = get_local_state(state); + local_state._shared_state->data_queue.set_low_memory_mode(); + } + bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; } private: diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 470914967ae..6412569bc4b 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -131,10 +131,7 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b } block->swap(*output_block); output_block->clear_column_data(_row_descriptor.num_materialized_slots()); - if (!state->get_query_ctx()->low_memory_mode()) { - local_state._shared_state->data_queue.push_free_block(std::move(output_block), - child_idx); - } + local_state._shared_state->data_queue.push_free_block(std::move(output_block), child_idx); } local_state.reached_limit(block, eos); return Status::OK(); diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 0ee66c3da74..e3a119354c1 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -102,6 +102,13 @@ public: return _followed_by_shuffled_operator; } + void set_low_memory_mode(RuntimeState* state) override { + auto& local_state = get_local_state(state); + if (local_state._shared_state) { + local_state._shared_state->data_queue.set_low_memory_mode(); + } + } + bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; } private: diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index 084d33e456f..a1c535c487b 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -146,10 +146,8 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (state->get_query_ctx()->low_memory_mode()) { - local_state._shared_state->set_low_memory_mode(state); - local_state._exchanger->set_low_memory_mode(); + set_low_memory_mode(state); } - RETURN_IF_ERROR(local_state._exchanger->sink( state, in_block, eos, {local_state._compute_hash_value_timer, local_state._distribute_timer, nullptr}, @@ -162,5 +160,4 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* return Status::OK(); } - } // namespace doris::pipeline diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h index c067f023c8d..160d6b363cf 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h @@ -108,6 +108,13 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; + void set_low_memory_mode(RuntimeState* state) override { + auto& local_state = get_local_state(state); + SCOPED_TIMER(local_state.exec_time_counter()); + local_state._shared_state->set_low_memory_mode(state); + local_state._exchanger->set_low_memory_mode(); + } + private: friend class LocalExchangeSinkLocalState; friend class ShuffleExchanger; diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index b31dbd7a308..6e30297e2b2 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -453,15 +453,6 @@ void ExchangerBase::finalize() { } } -void ExchangerBase::set_low_memory_mode() { - _free_block_limit = 0; - - vectorized::Block block; - while (_free_blocks.try_dequeue(block)) { - // do nothing - } -} - void LocalMergeSortExchanger::finalize() { BlockWrapperSPtr next_block; vectorized::Block block; diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index 3a4bccf1f48..5684b418ff2 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -23,8 +23,11 @@ namespace doris { #include "common/compile_check_begin.h" namespace vectorized { +template <typename T> +void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks); + class PartitionerBase; -} +} // namespace vectorized namespace pipeline { class LocalExchangeSourceLocalState; class LocalExchangeSinkLocalState; @@ -91,7 +94,10 @@ public: virtual std::string data_queue_debug_string(int i) = 0; - void set_low_memory_mode(); + void set_low_memory_mode() { + _free_block_limit = 0; + clear_blocks(_free_blocks); + } protected: friend struct LocalExchangeSharedState; @@ -104,7 +110,7 @@ protected: const int _num_partitions; const int _num_senders; const int _num_sources; - int _free_block_limit = 0; + std::atomic_int _free_block_limit = 0; moodycamel::ConcurrentQueue<vectorized::Block> _free_blocks; }; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index ed40731bfd1..e09da6ab742 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -403,12 +403,17 @@ Status PipelineTask::execute(bool* eos) { *eos = _pending_eos; } else { SCOPED_TIMER(_get_block_timer); + if (_state->get_query_ctx()->low_memory_mode()) { + _sink->set_low_memory_mode(_state); + _root->set_low_memory_mode(_state); + } DEFER_RELEASE_RESERVED(); _get_block_counter->update(1); const auto reserve_size = _root->get_reserve_mem_size(_state); _root->reset_reserve_mem_size(_state); - if (workload_group && _state->enable_reserve_memory() && reserve_size > 0) { + if (workload_group && _state->get_query_ctx()->enable_reserve_memory() && + reserve_size > 0) { auto st = thread_context()->try_reserve_memory(reserve_size); COUNTER_UPDATE(_memory_reserve_times, 1); @@ -441,7 +446,7 @@ Status PipelineTask::execute(bool* eos) { DEFER_RELEASE_RESERVED(); COUNTER_UPDATE(_memory_reserve_times, 1); auto workload_group = _state->get_query_ctx()->workload_group(); - if (_state->enable_reserve_memory() && workload_group && + if (_state->get_query_ctx()->enable_reserve_memory() && workload_group && !(wake_up_early() || _dry_run)) { const auto sink_reserve_size = _sink->get_reserve_mem_size(_state, *eos); status = sink_reserve_size != 0 diff --git a/be/src/runtime/memory/memory_profile.cpp b/be/src/runtime/memory/memory_profile.cpp index 2bf6a175e51..ab874964c99 100644 --- a/be/src/runtime/memory/memory_profile.cpp +++ b/be/src/runtime/memory/memory_profile.cpp @@ -153,8 +153,7 @@ void MemoryProfile::refresh_memory_overview_profile() { ExecEnv::GetInstance()->rowsets_no_cache_mem_tracker()->set_consumption( MetadataAdder<RowsetMeta>::get_all_rowsets_size()); ExecEnv::GetInstance()->segments_no_cache_mem_tracker()->set_consumption( - MetadataAdder<segment_v2::Segment>::get_all_segments_estimate_size() - - SegmentLoader::instance()->cache_mem_usage()); + MetadataAdder<segment_v2::Segment>::get_all_segments_size()); // 4 refresh tracked memory counter std::unordered_map<MemTrackerLimiter::Type, int64_t> type_mem_sum = { @@ -334,21 +333,16 @@ int64_t MemoryProfile::other_current_usage() { } std::string MemoryProfile::process_memory_detail_str() const { - return fmt::format("Process Memory Summary: {}\n, {}\n, {}\n, {}", + return fmt::format("Process Memory Summary: {}\n, {}\n, {}\n, {}\n, {}\n, {}\n", GlobalMemoryArbitrator::process_mem_log_str(), print_memory_overview_profile(), print_global_memory_profile(), + print_metadata_memory_profile(), print_cache_memory_profile(), print_top_memory_tasks_profile()); } void MemoryProfile::print_log_process_usage() { if (_enable_print_log_process_usage) { _enable_print_log_process_usage = false; - LOG(WARNING) << "Process Memory Summary: " + GlobalMemoryArbitrator::process_mem_log_str(); - LOG(WARNING) << "\n" << print_memory_overview_profile(); - LOG(WARNING) << "\n" << print_global_memory_profile(); - LOG(WARNING) << "\n" << print_metadata_memory_profile(); - LOG(WARNING) << "\n" << print_cache_memory_profile(); - LOG(WARNING) << "\n" << print_top_memory_tasks_profile(); LOG(WARNING) << process_memory_detail_str(); } } diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index a50d6041d35..954369e29fa 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -309,6 +309,13 @@ public: bool low_memory_mode() { return _low_memory_mode; } + void disable_reserve_memory() { _enable_reserve_memory = false; } + + bool enable_reserve_memory() const { + return _query_options.__isset.enable_reserve_memory && + _query_options.enable_reserve_memory && _enable_reserve_memory; + } + void update_paused_reason(const Status& st) { std::lock_guard l(_paused_mutex); if (_paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { @@ -391,6 +398,7 @@ private: MonotonicStopWatch _paused_timer; std::atomic<int64_t> _paused_period_secs = 0; std::atomic<bool> _low_memory_mode = false; + std::atomic<bool> _enable_reserve_memory = true; int64_t _user_set_mem_limit = 0; std::atomic<int64_t> _adjusted_mem_limit = 0; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 2cbc45dbd94..c4def4c0513 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -540,5 +540,4 @@ std::vector<std::shared_ptr<RuntimeProfile>> RuntimeState::build_pipeline_profil } return _pipeline_id_to_profile; } - } // end namespace doris diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 084f6522229..6c2d0918d8b 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -555,10 +555,6 @@ public: return _query_options.__isset.enable_force_spill && _query_options.enable_force_spill; } - bool enable_reserve_memory() const { - return _query_options.__isset.enable_reserve_memory && _query_options.enable_reserve_memory; - } - int64_t spill_min_revocable_mem() const { if (_query_options.__isset.min_revocable_mem) { return std::max(_query_options.min_revocable_mem, (int64_t)1); diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 4d458cd440f..eea6bdb5c5a 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -563,7 +563,7 @@ std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key) { Status BaseTabletsChannel::_write_block_data( const PTabletWriterAddBlockRequest& request, int64_t cur_seq, - std::unordered_map<int64_t, std::vector<uint32_t>>& tablet_to_rowidxs, + std::unordered_map<int64_t, DorisVector<uint32_t>>& tablet_to_rowidxs, PTabletWriterAddBlockResult* response) { vectorized::Block send_data; RETURN_IF_ERROR(send_data.deserialize(request.block())); @@ -639,7 +639,7 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, return Status::OK(); } - std::unordered_map<int64_t /* tablet_id */, std::vector<uint32_t> /* row index */> + std::unordered_map<int64_t /* tablet_id */, DorisVector<uint32_t> /* row index */> tablet_to_rowidxs; _build_tablet_to_rowidxs(request, &tablet_to_rowidxs); @@ -657,7 +657,7 @@ bool BaseTabletsChannel::_is_broken_tablet(int64_t tablet_id) const { void BaseTabletsChannel::_build_tablet_to_rowidxs( const PTabletWriterAddBlockRequest& request, - std::unordered_map<int64_t, std::vector<uint32_t>>* tablet_to_rowidxs) { + std::unordered_map<int64_t, DorisVector<uint32_t>>* tablet_to_rowidxs) { // just add a coarse-grained read lock here rather than each time when visiting _broken_tablets // tests show that a relatively coarse-grained read lock here performs better under multicore scenario // see: https://github.com/apache/doris/pull/28552 diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 87fbf9d06aa..55fe96df750 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -34,6 +34,7 @@ #include "util/runtime_profile.h" #include "util/spinlock.h" #include "util/uid_util.h" +#include "vec/common/custom_allocator.h" namespace google::protobuf { template <typename Element> @@ -121,7 +122,7 @@ public: protected: Status _write_block_data(const PTabletWriterAddBlockRequest& request, int64_t cur_seq, - std::unordered_map<int64_t, std::vector<uint32_t>>& tablet_to_rowidxs, + std::unordered_map<int64_t, DorisVector<uint32_t>>& tablet_to_rowidxs, PTabletWriterAddBlockResult* response); Status _get_current_seq(int64_t& cur_seq, const PTabletWriterAddBlockRequest& request); @@ -136,7 +137,7 @@ protected: int64_t tablet_id, Status error) const; void _build_tablet_to_rowidxs( const PTabletWriterAddBlockRequest& request, - std::unordered_map<int64_t /* tablet_id */, std::vector<uint32_t> /* row index */>* + std::unordered_map<int64_t /* tablet_id */, DorisVector<uint32_t> /* row index */>* tablet_to_rowidxs); virtual void _init_profile(RuntimeProfile* profile); diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 7eb31b28400..962db6df333 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -688,10 +688,12 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& query_ctx->set_memory_sufficient(true); return true; } else if (time_in_queue >= config::spill_in_paused_queue_timeout_ms) { - // Use MEM_LIMIT_EXCEEDED so that FE could parse the error code and do try logic + // if cannot find any memory to release, then let the query continue to run as far as possible + // or cancelled by gc if memory is really not enough. auto msg1 = fmt::format( - "Query {} failed beause query limit is exceeded, but could " - "not find memory that could release or spill to disk. Query memory usage: " + "Query {} memory limit is exceeded, but could " + "not find memory that could release or spill to disk, disable reserve " + "memory and resume it. Query memory usage: " "{}, limit: {}, reserved " "size: {}, try to reserve: {}, wg info: {}.", query_id, PrettyPrinter::print_bytes(memory_usage), @@ -702,7 +704,9 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& doris::ProcessProfile::instance() ->memory_profile() ->process_memory_detail_str()); - query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1)); + query_ctx->disable_reserve_memory(); + query_ctx->set_memory_sufficient(true); + return true; } else { return false; } @@ -713,9 +717,12 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& query_ctx->set_memory_sufficient(true); return true; } else if (time_in_queue > config::spill_in_paused_queue_timeout_ms) { + // if cannot find any memory to release, then let the query continue to run as far as possible + // or cancelled by gc if memory is really not enough. auto msg1 = fmt::format( - "Query {} failed because workload group memory is exceeded" - ", and there is no cache now. And could not find task to spill. " + "Query {} workload group memory is exceeded" + ", and there is no cache now. And could not find task to spill, disable " + "reserve memory and resume it. " "Query memory usage: {}, limit: {}, reserved " "size: {}, try to reserve: {}, wg info: {}." " Maybe you should set the workload group's limit to a lower value.", @@ -727,7 +734,9 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& doris::ProcessProfile::instance() ->memory_profile() ->process_memory_detail_str()); - query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1)); + query_ctx->disable_reserve_memory(); + query_ctx->set_memory_sufficient(true); + return true; } else { return false; } @@ -745,9 +754,12 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& query_ctx->set_memory_sufficient(true); return true; } else if (time_in_queue > config::spill_in_paused_queue_timeout_ms) { + // if cannot find any memory to release, then let the query continue to run as far as possible + // or cancelled by gc if memory is really not enough. auto msg1 = fmt::format( - "Query {} failed because process memory is exceeded" - ", and there is no cache now. And could not find task to spill. " + "Query {} process memory is exceeded" + ", and there is no cache now. And could not find task to spill, disable " + "reserve memory and resume it. " "Query memory usage: {}, limit: {}, reserved " "size: {}, try to reserve: {}, wg info: {}." " Maybe you should set the workload group's limit to a lower value.", @@ -759,7 +771,8 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& doris::ProcessProfile::instance() ->memory_profile() ->process_memory_detail_str()); - query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1)); + query_ctx->disable_reserve_memory(); + query_ctx->set_memory_sufficient(true); } else { return false; } @@ -831,6 +844,9 @@ void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha if (!query_ctx) { continue; } + if (is_low_watermark) { + query_ctx->set_low_memory_mode(); + } int64_t query_weighted_mem_limit = 0; int64_t expected_query_weighted_mem_limit = 0; // If the query enable hard limit, then it should not use the soft limit diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 29442c8ecac..8584e5df3cd 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -60,6 +60,16 @@ enum CompressionTypePB : int; } // namespace doris::segment_v2 namespace doris::vectorized { +template <typename T> +void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks) { + T block; + while (blocks.try_dequeue(block)) { + // do nothing + } +} + +template void clear_blocks<Block>(moodycamel::ConcurrentQueue<Block>&); +template void clear_blocks<BlockUPtr>(moodycamel::ConcurrentQueue<BlockUPtr>&); Block::Block(std::initializer_list<ColumnWithTypeAndName> il) : data {il} { initialize_index_by_name(); diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 5f0aeaae67b..c617ff27b1f 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -262,11 +262,7 @@ void ScannerContext::return_free_block(vectorized::BlockUPtr block) { } void ScannerContext::clear_free_blocks() { - vectorized::BlockUPtr block; - while (_free_blocks.try_dequeue(block)) { - // do nothing - } - block.reset(); + clear_blocks(_free_blocks); } Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) { diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 8a2387cc69e..530d9371059 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -303,7 +303,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, if (first_read) { free_block = ctx->get_free_block(first_read); } else { - if (state->enable_reserve_memory()) { + if (state->get_query_ctx()->enable_reserve_memory()) { size_t block_avg_bytes = scanner->get_block_avg_bytes(); auto st = thread_context()->try_reserve_memory(block_avg_bytes); if (!st.ok()) { diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h index 6248a28dba5..83e1c491c06 100644 --- a/be/src/vec/sink/vrow_distribution.h +++ b/be/src/vec/sink/vrow_distribution.h @@ -48,9 +48,9 @@ class VNodeChannel; // <row_idx, partition_id, tablet_id> class RowPartTabletIds { public: - std::vector<uint32_t> row_ids; - std::vector<int64_t> partition_ids; - std::vector<int64_t> tablet_ids; + DorisVector<uint32_t> row_ids; + DorisVector<int64_t> partition_ids; + DorisVector<int64_t> tablet_ids; std::string debug_string() const { std::string value; diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index ed4f71677f2..28756cfe78f 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -235,5 +235,10 @@ std::unique_ptr<Block> AsyncResultWriter::_get_free_block(doris::vectorized::Blo return b; } +template <typename T> +void clear_blocks(moodycamel::ConcurrentQueue<T>& blocks); +void AsyncResultWriter::clear_free_blocks() { + clear_blocks(_free_blocks); +} } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index 2a90dd2dbd0..a1265655c58 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -72,6 +72,8 @@ public: Status get_writer_status() { return _writer_status.status(); } + void clear_free_blocks(); + protected: Status _projection_block(Block& input_block, Block* output_block); const VExprContextSPtrs& _vec_output_expr_ctxs; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index 46a3974bba8..b6b6b623698 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -89,7 +89,7 @@ class DeltaWriterV2Map; struct Rows { int64_t partition_id; int64_t index_id; - std::vector<uint32_t> row_idxes; + DorisVector<uint32_t> row_idxes; }; using RowsForTablet = std::unordered_map<int64_t, Rows>; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org