This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 643f622b515 make memtable usage counter more accurate (#47017)
643f622b515 is described below

commit 643f622b51564e6204f56f26a5b9b0d2cd7f785a
Author: yiguolei <guo...@selectdb.com>
AuthorDate: Wed Jan 15 13:40:31 2025 +0800

    make memtable usage counter more accurate (#47017)
---
 be/src/olap/memtable.cpp | 86 ++++++++++++++++++++++++++----------------------
 be/src/olap/memtable.h   |  6 ++--
 2 files changed, 50 insertions(+), 42 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 8593f1ef482..67b68f978a1 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -61,6 +61,10 @@ MemTable::MemTable(int64_t tablet_id, 
std::shared_ptr<TabletSchema> tablet_schem
           _total_size_of_aggregate_states(0) {
     g_memtable_cnt << 1;
     _query_thread_context.init_unlocked();
+    _mem_tracker = std::make_shared<MemTracker>();
+    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+            _query_thread_context.query_mem_tracker->write_tracker());
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     _arena = std::make_unique<vectorized::Arena>();
     _vec_row_comparator = 
std::make_shared<RowInBlockComparator>(_tablet_schema);
     _num_columns = _tablet_schema->num_columns();
@@ -77,7 +81,7 @@ MemTable::MemTable(int64_t tablet_id, 
std::shared_ptr<TabletSchema> tablet_schem
     }
     // TODO: Support ZOrderComparator in the future
     _init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
-    _mem_tracker = std::make_shared<MemTracker>();
+    _row_in_blocks = std::make_unique<DorisVector<RowInBlock*>>();
 }
 
 void MemTable::_init_columns_offset_by_slot_descs(const 
std::vector<SlotDescriptor*>* slot_descs,
@@ -145,6 +149,34 @@ void MemTable::_init_agg_functions(const 
vectorized::Block* block) {
 MemTable::~MemTable() {
     SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
             _query_thread_context.query_mem_tracker->write_tracker());
+    {
+        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+        g_memtable_cnt << -1;
+        if (_keys_type != KeysType::DUP_KEYS) {
+            for (auto it = _row_in_blocks->begin(); it != 
_row_in_blocks->end(); it++) {
+                if (!(*it)->has_init_agg()) {
+                    continue;
+                }
+                // We should release agg_places here, because they are not 
released when a
+                // load is canceled.
+                for (size_t i = _tablet_schema->num_key_columns(); i < 
_num_columns; ++i) {
+                    auto function = _agg_functions[i];
+                    DCHECK(function != nullptr);
+                    function->destroy((*it)->agg_places(i));
+                }
+            }
+        }
+        std::for_each(_row_in_blocks->begin(), _row_in_blocks->end(),
+                      std::default_delete<RowInBlock>());
+        // Arena has to be destroyed after agg state, because some agg state's 
memory may be
+        // allocated in arena.
+        _arena.reset();
+        _vec_row_comparator.reset();
+        _row_in_blocks.reset();
+        _agg_functions.clear();
+        _input_mutable_block.clear();
+        _output_mutable_block.clear();
+    }
     if (_is_flush_success) {
         // If the memtable is flush success, then its memtracker's consumption 
should be 0
         if (_mem_tracker->consumption() != 0 && 
config::crash_in_memory_tracker_inaccurate) {
@@ -152,28 +184,6 @@ MemTable::~MemTable() {
                        << _mem_tracker->consumption();
         }
     }
-    g_memtable_cnt << -1;
-    if (_keys_type != KeysType::DUP_KEYS) {
-        for (auto it = _row_in_blocks.begin(); it != _row_in_blocks.end(); 
it++) {
-            if (!(*it)->has_init_agg()) {
-                continue;
-            }
-            // We should release agg_places here, because they are not 
released when a
-            // load is canceled.
-            for (size_t i = _tablet_schema->num_key_columns(); i < 
_num_columns; ++i) {
-                auto function = _agg_functions[i];
-                DCHECK(function != nullptr);
-                function->destroy((*it)->agg_places(i));
-            }
-        }
-    }
-    std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), 
std::default_delete<RowInBlock>());
-    _arena.reset();
-    _vec_row_comparator.reset();
-    _row_in_blocks.clear();
-    _agg_functions.clear();
-    _input_mutable_block.clear();
-    _output_mutable_block.clear();
 }
 
 int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* 
right) const {
@@ -227,7 +237,7 @@ Status MemTable::insert(const vectorized::Block* 
input_block,
     RETURN_IF_ERROR(_input_mutable_block.add_rows(input_block, row_idxs.data(),
                                                   row_idxs.data() + num_rows, 
&_column_offset));
     for (int i = 0; i < num_rows; i++) {
-        _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + 
i});
+        _row_in_blocks->emplace_back(new RowInBlock {cursor_in_mutableblock + 
i});
     }
 
     _stat.raw_rows += num_rows;
@@ -285,8 +295,8 @@ Status MemTable::_put_into_output(vectorized::Block& 
in_block) {
     DorisVector<uint32_t> row_pos_vec;
     DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
     row_pos_vec.reserve(in_block.rows());
-    for (int i = 0; i < _row_in_blocks.size(); i++) {
-        row_pos_vec.emplace_back(_row_in_blocks[i]->_row_pos);
+    for (int i = 0; i < _row_in_blocks->size(); i++) {
+        row_pos_vec.emplace_back((*_row_in_blocks)[i]->_row_pos);
     }
     return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
                                           row_pos_vec.data() + 
in_block.rows());
@@ -297,19 +307,19 @@ size_t MemTable::_sort() {
     _stat.sort_times++;
     size_t same_keys_num = 0;
     // sort new rows
-    Tie tie = Tie(_last_sorted_pos, _row_in_blocks.size());
+    Tie tie = Tie(_last_sorted_pos, _row_in_blocks->size());
     for (size_t i = 0; i < _tablet_schema->num_key_columns(); i++) {
         auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int {
             return _input_mutable_block.compare_one_column(lhs->_row_pos, 
rhs->_row_pos, i, -1);
         };
-        _sort_one_column(_row_in_blocks, tie, cmp);
+        _sort_one_column(*_row_in_blocks, tie, cmp);
     }
     bool is_dup = (_keys_type == KeysType::DUP_KEYS);
     // sort extra round by _row_pos to make the sort stable
     auto iter = tie.iter();
     while (iter.next()) {
-        pdqsort(std::next(_row_in_blocks.begin(), iter.left()),
-                std::next(_row_in_blocks.begin(), iter.right()),
+        pdqsort(std::next(_row_in_blocks->begin(), iter.left()),
+                std::next(_row_in_blocks->begin(), iter.right()),
                 [&is_dup](const RowInBlock* lhs, const RowInBlock* rhs) -> 
bool {
                     return is_dup ? lhs->_row_pos > rhs->_row_pos : 
lhs->_row_pos < rhs->_row_pos;
                 });
@@ -327,9 +337,9 @@ size_t MemTable::_sort() {
             return value < 0;
         }
     };
-    auto new_row_it = std::next(_row_in_blocks.begin(), _last_sorted_pos);
-    std::inplace_merge(_row_in_blocks.begin(), new_row_it, 
_row_in_blocks.end(), cmp_func);
-    _last_sorted_pos = _row_in_blocks.size();
+    auto new_row_it = std::next(_row_in_blocks->begin(), _last_sorted_pos);
+    std::inplace_merge(_row_in_blocks->begin(), new_row_it, 
_row_in_blocks->end(), cmp_func);
+    _last_sorted_pos = _row_in_blocks->size();
     return same_keys_num;
 }
 
@@ -483,7 +493,7 @@ void MemTable::_aggregate() {
     };
 
     if (!has_skip_bitmap_col || _seq_col_idx_in_block == -1) {
-        for (RowInBlock* cur_row : _row_in_blocks) {
+        for (RowInBlock* cur_row : *_row_in_blocks) {
             if (!temp_row_in_blocks.empty() && 
(*_vec_row_comparator)(prev_row, cur_row) == 0) {
                 if (!prev_row->has_init_agg()) {
                     init_for_agg(prev_row);
@@ -542,7 +552,7 @@ void MemTable::_aggregate() {
         auto& skip_bitmaps = assert_cast<vectorized::ColumnBitmap*>(
                                      
mutable_block.mutable_columns()[_skip_bitmap_col_idx].get())
                                      ->get_data();
-        for (auto* cur_row : _row_in_blocks) {
+        for (auto* cur_row : *_row_in_blocks) {
             const BitmapValue& skip_bitmap = skip_bitmaps[cur_row->_row_pos];
             bool with_seq_col = !skip_bitmap.contains(_seq_col_unique_id);
             // compare keys, the keys of row_with_seq_col and row_with_seq_col 
is the same,
@@ -576,8 +586,8 @@ void MemTable::_aggregate() {
         _output_mutable_block =
                 
vectorized::MutableBlock::build_mutable_block(empty_input_block.get());
         _output_mutable_block.clear_column_data();
-        _row_in_blocks = temp_row_in_blocks;
-        _last_sorted_pos = _row_in_blocks.size();
+        *_row_in_blocks = temp_row_in_blocks;
+        _last_sorted_pos = _row_in_blocks->size();
     }
 }
 
@@ -648,8 +658,6 @@ Status 
MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
         RETURN_IF_ERROR(_sort_by_cluster_keys());
     }
     _input_mutable_block.clear();
-    // After to block, all data in arena is saved in the block
-    _arena.reset();
     *res = vectorized::Block::create_unique(_output_mutable_block.to_block());
     return Status::OK();
 }
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index f4b1de45272..5e98bd51a74 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -263,10 +263,10 @@ private:
     bool _is_first_insertion;
 
     void _init_agg_functions(const vectorized::Block* block);
-    DorisVector<vectorized::AggregateFunctionPtr> _agg_functions;
-    DorisVector<size_t> _offsets_of_aggregate_states;
+    std::vector<vectorized::AggregateFunctionPtr> _agg_functions;
+    std::vector<size_t> _offsets_of_aggregate_states;
     size_t _total_size_of_aggregate_states;
-    DorisVector<RowInBlock*> _row_in_blocks;
+    std::unique_ptr<DorisVector<RowInBlock*>> _row_in_blocks;
 
     size_t _num_columns;
     int32_t _seq_col_idx_in_block = -1;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to