This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 643f622b515 make memtable usage counter more accurate (#47017) 643f622b515 is described below commit 643f622b51564e6204f56f26a5b9b0d2cd7f785a Author: yiguolei <guo...@selectdb.com> AuthorDate: Wed Jan 15 13:40:31 2025 +0800 make memtable usage counter more accurate (#47017) --- be/src/olap/memtable.cpp | 86 ++++++++++++++++++++++++++---------------------- be/src/olap/memtable.h | 6 ++-- 2 files changed, 50 insertions(+), 42 deletions(-) diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 8593f1ef482..67b68f978a1 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -61,6 +61,10 @@ MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schem _total_size_of_aggregate_states(0) { g_memtable_cnt << 1; _query_thread_context.init_unlocked(); + _mem_tracker = std::make_shared<MemTracker>(); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + _query_thread_context.query_mem_tracker->write_tracker()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); _arena = std::make_unique<vectorized::Arena>(); _vec_row_comparator = std::make_shared<RowInBlockComparator>(_tablet_schema); _num_columns = _tablet_schema->num_columns(); @@ -77,7 +81,7 @@ MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schem } // TODO: Support ZOrderComparator in the future _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); - _mem_tracker = std::make_shared<MemTracker>(); + _row_in_blocks = std::make_unique<DorisVector<RowInBlock*>>(); } void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs, @@ -145,6 +149,34 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) { MemTable::~MemTable() { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( _query_thread_context.query_mem_tracker->write_tracker()); + { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); + g_memtable_cnt << -1; + if (_keys_type != KeysType::DUP_KEYS) { + for (auto it = _row_in_blocks->begin(); it != _row_in_blocks->end(); it++) { + if (!(*it)->has_init_agg()) { + continue; + } + // We should release agg_places here, because they are not released when a + // load is canceled. + for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { + auto function = _agg_functions[i]; + DCHECK(function != nullptr); + function->destroy((*it)->agg_places(i)); + } + } + } + std::for_each(_row_in_blocks->begin(), _row_in_blocks->end(), + std::default_delete<RowInBlock>()); + // Arena has to be destroyed after agg state, because some agg state's memory may be + // allocated in arena. + _arena.reset(); + _vec_row_comparator.reset(); + _row_in_blocks.reset(); + _agg_functions.clear(); + _input_mutable_block.clear(); + _output_mutable_block.clear(); + } if (_is_flush_success) { // If the memtable is flush success, then its memtracker's consumption should be 0 if (_mem_tracker->consumption() != 0 && config::crash_in_memory_tracker_inaccurate) { @@ -152,28 +184,6 @@ MemTable::~MemTable() { << _mem_tracker->consumption(); } } - g_memtable_cnt << -1; - if (_keys_type != KeysType::DUP_KEYS) { - for (auto it = _row_in_blocks.begin(); it != _row_in_blocks.end(); it++) { - if (!(*it)->has_init_agg()) { - continue; - } - // We should release agg_places here, because they are not released when a - // load is canceled. - for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { - auto function = _agg_functions[i]; - DCHECK(function != nullptr); - function->destroy((*it)->agg_places(i)); - } - } - } - std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete<RowInBlock>()); - _arena.reset(); - _vec_row_comparator.reset(); - _row_in_blocks.clear(); - _agg_functions.clear(); - _input_mutable_block.clear(); - _output_mutable_block.clear(); } int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* right) const { @@ -227,7 +237,7 @@ Status MemTable::insert(const vectorized::Block* input_block, RETURN_IF_ERROR(_input_mutable_block.add_rows(input_block, row_idxs.data(), row_idxs.data() + num_rows, &_column_offset)); for (int i = 0; i < num_rows; i++) { - _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i}); + _row_in_blocks->emplace_back(new RowInBlock {cursor_in_mutableblock + i}); } _stat.raw_rows += num_rows; @@ -285,8 +295,8 @@ Status MemTable::_put_into_output(vectorized::Block& in_block) { DorisVector<uint32_t> row_pos_vec; DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); row_pos_vec.reserve(in_block.rows()); - for (int i = 0; i < _row_in_blocks.size(); i++) { - row_pos_vec.emplace_back(_row_in_blocks[i]->_row_pos); + for (int i = 0; i < _row_in_blocks->size(); i++) { + row_pos_vec.emplace_back((*_row_in_blocks)[i]->_row_pos); } return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(), row_pos_vec.data() + in_block.rows()); @@ -297,19 +307,19 @@ size_t MemTable::_sort() { _stat.sort_times++; size_t same_keys_num = 0; // sort new rows - Tie tie = Tie(_last_sorted_pos, _row_in_blocks.size()); + Tie tie = Tie(_last_sorted_pos, _row_in_blocks->size()); for (size_t i = 0; i < _tablet_schema->num_key_columns(); i++) { auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int { return _input_mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, i, -1); }; - _sort_one_column(_row_in_blocks, tie, cmp); + _sort_one_column(*_row_in_blocks, tie, cmp); } bool is_dup = (_keys_type == KeysType::DUP_KEYS); // sort extra round by _row_pos to make the sort stable auto iter = tie.iter(); while (iter.next()) { - pdqsort(std::next(_row_in_blocks.begin(), iter.left()), - std::next(_row_in_blocks.begin(), iter.right()), + pdqsort(std::next(_row_in_blocks->begin(), iter.left()), + std::next(_row_in_blocks->begin(), iter.right()), [&is_dup](const RowInBlock* lhs, const RowInBlock* rhs) -> bool { return is_dup ? lhs->_row_pos > rhs->_row_pos : lhs->_row_pos < rhs->_row_pos; }); @@ -327,9 +337,9 @@ size_t MemTable::_sort() { return value < 0; } }; - auto new_row_it = std::next(_row_in_blocks.begin(), _last_sorted_pos); - std::inplace_merge(_row_in_blocks.begin(), new_row_it, _row_in_blocks.end(), cmp_func); - _last_sorted_pos = _row_in_blocks.size(); + auto new_row_it = std::next(_row_in_blocks->begin(), _last_sorted_pos); + std::inplace_merge(_row_in_blocks->begin(), new_row_it, _row_in_blocks->end(), cmp_func); + _last_sorted_pos = _row_in_blocks->size(); return same_keys_num; } @@ -483,7 +493,7 @@ void MemTable::_aggregate() { }; if (!has_skip_bitmap_col || _seq_col_idx_in_block == -1) { - for (RowInBlock* cur_row : _row_in_blocks) { + for (RowInBlock* cur_row : *_row_in_blocks) { if (!temp_row_in_blocks.empty() && (*_vec_row_comparator)(prev_row, cur_row) == 0) { if (!prev_row->has_init_agg()) { init_for_agg(prev_row); @@ -542,7 +552,7 @@ void MemTable::_aggregate() { auto& skip_bitmaps = assert_cast<vectorized::ColumnBitmap*>( mutable_block.mutable_columns()[_skip_bitmap_col_idx].get()) ->get_data(); - for (auto* cur_row : _row_in_blocks) { + for (auto* cur_row : *_row_in_blocks) { const BitmapValue& skip_bitmap = skip_bitmaps[cur_row->_row_pos]; bool with_seq_col = !skip_bitmap.contains(_seq_col_unique_id); // compare keys, the keys of row_with_seq_col and row_with_seq_col is the same, @@ -576,8 +586,8 @@ void MemTable::_aggregate() { _output_mutable_block = vectorized::MutableBlock::build_mutable_block(empty_input_block.get()); _output_mutable_block.clear_column_data(); - _row_in_blocks = temp_row_in_blocks; - _last_sorted_pos = _row_in_blocks.size(); + *_row_in_blocks = temp_row_in_blocks; + _last_sorted_pos = _row_in_blocks->size(); } } @@ -648,8 +658,6 @@ Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) { RETURN_IF_ERROR(_sort_by_cluster_keys()); } _input_mutable_block.clear(); - // After to block, all data in arena is saved in the block - _arena.reset(); *res = vectorized::Block::create_unique(_output_mutable_block.to_block()); return Status::OK(); } diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index f4b1de45272..5e98bd51a74 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -263,10 +263,10 @@ private: bool _is_first_insertion; void _init_agg_functions(const vectorized::Block* block); - DorisVector<vectorized::AggregateFunctionPtr> _agg_functions; - DorisVector<size_t> _offsets_of_aggregate_states; + std::vector<vectorized::AggregateFunctionPtr> _agg_functions; + std::vector<size_t> _offsets_of_aggregate_states; size_t _total_size_of_aggregate_states; - DorisVector<RowInBlock*> _row_in_blocks; + std::unique_ptr<DorisVector<RowInBlock*>> _row_in_blocks; size_t _num_columns; int32_t _seq_col_idx_in_block = -1; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org