This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new a01d1ae [Compaction] track RowsetReader's mem & add metric (#4068) a01d1ae is described below commit a01d1aec568247c681e26d3815421996f0c62544 Author: HuangWei <huang...@apache.org> AuthorDate: Fri Jul 24 07:58:09 2020 +0800 [Compaction] track RowsetReader's mem & add metric (#4068) Ref https://github.com/apache/incubator-doris/issues/3624#issuecomment-655933244 Only RowsetReaders in compaction are under the track. Other RowsetReaders won't be effected, because the parent_tracker is nullptr. --- be/src/olap/base_compaction.cpp | 6 +- be/src/olap/base_compaction.h | 2 +- be/src/olap/compaction.cpp | 17 +++--- be/src/olap/compaction.h | 7 ++- be/src/olap/cumulative_compaction.cpp | 8 +-- be/src/olap/cumulative_compaction.h | 3 +- be/src/olap/row_block.cpp | 4 +- be/src/olap/row_block.h | 2 +- be/src/olap/rowset/alpha_rowset.cpp | 12 +++- be/src/olap/rowset/alpha_rowset.h | 27 ++++----- be/src/olap/rowset/alpha_rowset_reader.cpp | 21 +++---- be/src/olap/rowset/alpha_rowset_reader.h | 6 +- be/src/olap/rowset/beta_rowset.cpp | 6 ++ be/src/olap/rowset/beta_rowset.h | 13 +++-- be/src/olap/rowset/beta_rowset_reader.cpp | 6 +- be/src/olap/rowset/beta_rowset_reader.h | 16 +++--- be/src/olap/rowset/column_data.cpp | 23 ++++---- be/src/olap/rowset/column_data.h | 5 +- be/src/olap/rowset/rowset.h | 5 ++ be/src/olap/rowset/segment_reader.cpp | 67 ++++++++++------------ be/src/olap/rowset/segment_reader.h | 17 ++---- be/src/olap/storage_engine.cpp | 39 ++++++++----- be/src/olap/storage_engine.h | 2 + be/src/runtime/initial_reservations.cc | 2 +- be/src/runtime/vectorized_row_batch.cpp | 11 ++-- be/src/runtime/vectorized_row_batch.h | 6 +- be/src/util/doris_metrics.h | 22 +++---- .../operation/monitor-metrics/be-metrics.md | 7 +++ .../operation/monitor-metrics/be-metrics.md | 7 +++ 29 files changed, 211 insertions(+), 158 deletions(-) diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index e155335..0ccb4ca 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -21,9 +21,9 @@ namespace doris { -BaseCompaction::BaseCompaction(TabletSharedPtr tablet) - : Compaction(tablet) -{ } +BaseCompaction::BaseCompaction(TabletSharedPtr tablet, const std::string& label, + MemTracker* parent_tracker) + : Compaction(tablet, label, parent_tracker) {} BaseCompaction::~BaseCompaction() { } diff --git a/be/src/olap/base_compaction.h b/be/src/olap/base_compaction.h index ff42aa5..9ea54a9 100644 --- a/be/src/olap/base_compaction.h +++ b/be/src/olap/base_compaction.h @@ -29,7 +29,7 @@ namespace doris { class BaseCompaction : public Compaction { public: - BaseCompaction(TabletSharedPtr tablet); + BaseCompaction(TabletSharedPtr tablet, const std::string& label, MemTracker* parent_tracker); ~BaseCompaction() override; OLAPStatus compact() override; diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index adb3b6a..5a74671 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -27,12 +27,13 @@ namespace doris { Semaphore Compaction::_concurrency_sem; -Compaction::Compaction(TabletSharedPtr tablet) - : _tablet(tablet), - _input_rowsets_size(0), - _input_row_num(0), - _state(CompactionState::INITED) -{ } +Compaction::Compaction(TabletSharedPtr tablet, const std::string& label, MemTracker* parent_tracker) + : _mem_tracker(-1, label, parent_tracker, true), + _readers_tracker(-1, "readers tracker", &_mem_tracker, true), + _tablet(tablet), + _input_rowsets_size(0), + _input_row_num(0), + _state(CompactionState::INITED) {} Compaction::~Compaction() {} @@ -74,6 +75,7 @@ OLAPStatus Compaction::do_compaction_impl() { TRACE("prepare finished"); // 2. write merged rows to output rowset + // The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool Merger::Statistics stats; auto res = Merger::merge_rowsets(_tablet, compaction_type(), _input_rs_readers, _output_rs_writer.get(), &stats); if (res != OLAP_SUCCESS) { @@ -143,6 +145,7 @@ OLAPStatus Compaction::construct_output_rowset_writer() { context.version = _output_version; context.version_hash = _output_version_hash; context.segments_overlap = NONOVERLAPPING; + // The test results show that one rs writer is low-memory-footprint, there is no need to tracker its mem pool RETURN_NOT_OK(RowsetFactory::create_rowset_writer(context, &_output_rs_writer)); return OLAP_SUCCESS; } @@ -150,7 +153,7 @@ OLAPStatus Compaction::construct_output_rowset_writer() { OLAPStatus Compaction::construct_input_rowset_readers() { for (auto& rowset : _input_rowsets) { RowsetReaderSharedPtr rs_reader; - RETURN_NOT_OK(rowset->create_reader(&rs_reader)); + RETURN_NOT_OK(rowset->create_reader(&_readers_tracker, &rs_reader)); _input_rs_readers.push_back(std::move(rs_reader)); } return OLAP_SUCCESS; diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 9837ac5..9507cc2 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -44,7 +44,7 @@ class Merger; // 4. gc unused rowstes class Compaction { public: - Compaction(TabletSharedPtr tablet); + Compaction(TabletSharedPtr tablet, const std::string& label, MemTracker* parent_tracker); virtual ~Compaction(); virtual OLAPStatus compact() = 0; @@ -77,6 +77,11 @@ private: int64_t _get_input_num_rows_from_seg_grps(); protected: + // the root tracker for this compaction + MemTracker _mem_tracker; + + // the child of root, only track rowset readers mem + MemTracker _readers_tracker; TabletSharedPtr _tablet; std::vector<RowsetSharedPtr> _input_rowsets; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index dddf02c..3494561 100755 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -22,10 +22,10 @@ namespace doris { -CumulativeCompaction::CumulativeCompaction(TabletSharedPtr tablet) - : Compaction(tablet), - _cumulative_rowset_size_threshold(config::cumulative_compaction_budgeted_bytes) -{ } +CumulativeCompaction::CumulativeCompaction(TabletSharedPtr tablet, const std::string& label, + MemTracker* parent_tracker) + : Compaction(tablet, label, parent_tracker), + _cumulative_rowset_size_threshold(config::cumulative_compaction_budgeted_bytes) {} CumulativeCompaction::~CumulativeCompaction() { } diff --git a/be/src/olap/cumulative_compaction.h b/be/src/olap/cumulative_compaction.h index 08b650e..49e5f2c 100755 --- a/be/src/olap/cumulative_compaction.h +++ b/be/src/olap/cumulative_compaction.h @@ -26,7 +26,8 @@ namespace doris { class CumulativeCompaction : public Compaction { public: - CumulativeCompaction(TabletSharedPtr tablet); + CumulativeCompaction(TabletSharedPtr tablet, const std::string& label, + MemTracker* parent_tracker); ~CumulativeCompaction() override; OLAPStatus compact() override; diff --git a/be/src/olap/row_block.cpp b/be/src/olap/row_block.cpp index aa00e59..09cf48b 100644 --- a/be/src/olap/row_block.cpp +++ b/be/src/olap/row_block.cpp @@ -37,10 +37,10 @@ using std::vector; namespace doris { -RowBlock::RowBlock(const TabletSchema* schema) : +RowBlock::RowBlock(const TabletSchema* schema, MemTracker* parent_tracker) : _capacity(0), _schema(schema) { - _tracker.reset(new MemTracker(-1)); + _tracker.reset(new MemTracker(-1, "RowBlock", parent_tracker, true)); _mem_pool.reset(new MemPool(_tracker.get())); } diff --git a/be/src/olap/row_block.h b/be/src/olap/row_block.h index cb99128..c9d277f 100644 --- a/be/src/olap/row_block.h +++ b/be/src/olap/row_block.h @@ -56,7 +56,7 @@ class RowBlock { friend class RowBlockChanger; friend class VectorizedRowBatch; public: - RowBlock(const TabletSchema* schema); + RowBlock(const TabletSchema* schema, MemTracker* parent_tracker = nullptr); // 注意回收内部buffer ~RowBlock(); diff --git a/be/src/olap/rowset/alpha_rowset.cpp b/be/src/olap/rowset/alpha_rowset.cpp index 8292913..337a049 100644 --- a/be/src/olap/rowset/alpha_rowset.cpp +++ b/be/src/olap/rowset/alpha_rowset.cpp @@ -52,8 +52,16 @@ OLAPStatus AlphaRowset::do_load(bool use_cache) { } OLAPStatus AlphaRowset::create_reader(std::shared_ptr<RowsetReader>* result) { - result->reset(new AlphaRowsetReader( - _schema->num_rows_per_row_block(), std::static_pointer_cast<AlphaRowset>(shared_from_this()))); + result->reset(new AlphaRowsetReader(_schema->num_rows_per_row_block(), + std::static_pointer_cast<AlphaRowset>(shared_from_this()))); + return OLAP_SUCCESS; +} + +OLAPStatus AlphaRowset::create_reader(MemTracker* parent_tracker, + std::shared_ptr<RowsetReader>* result) { + result->reset(new AlphaRowsetReader(_schema->num_rows_per_row_block(), + std::static_pointer_cast<AlphaRowset>(shared_from_this()), + parent_tracker)); return OLAP_SUCCESS; } diff --git a/be/src/olap/rowset/alpha_rowset.h b/be/src/olap/rowset/alpha_rowset.h index 1d4f8a9..bff9618 100644 --- a/be/src/olap/rowset/alpha_rowset.h +++ b/be/src/olap/rowset/alpha_rowset.h @@ -18,14 +18,14 @@ #ifndef DORIS_BE_SRC_OLAP_ROWSET_ALPHA_ROWSET_H #define DORIS_BE_SRC_OLAP_ROWSET_ALPHA_ROWSET_H +#include "olap/data_dir.h" #include "olap/rowset/rowset.h" -#include "olap/rowset/segment_group.h" #include "olap/rowset/rowset_meta.h" -#include "olap/data_dir.h" +#include "olap/rowset/segment_group.h" #include "olap/tuple.h" -#include <vector> #include <memory> +#include <vector> namespace doris { @@ -42,8 +42,10 @@ public: OLAPStatus create_reader(std::shared_ptr<RowsetReader>* result) override; - OLAPStatus split_range(const RowCursor& start_key, - const RowCursor& end_key, + OLAPStatus create_reader(MemTracker* parent_tracker, + std::shared_ptr<RowsetReader>* result) override; + + OLAPStatus split_range(const RowCursor& start_key, const RowCursor& end_key, uint64_t request_block_row_count, std::vector<OlapTuple>* ranges) override; @@ -54,13 +56,13 @@ public: OLAPStatus copy_files_to(const std::string& dir) override; OLAPStatus convert_from_old_files(const std::string& snapshot_path, - std::vector<std::string>* success_files); + std::vector<std::string>* success_files); - OLAPStatus convert_to_old_files(const std::string& snapshot_path, - std::vector<std::string>* success_files); + OLAPStatus convert_to_old_files(const std::string& snapshot_path, + std::vector<std::string>* success_files); OLAPStatus remove_old_files(std::vector<std::string>* files_to_remove) override; - + bool check_path(const std::string& path) override; // when convert from old be, should set row num, index size, data size @@ -70,16 +72,15 @@ public: protected: friend class RowsetFactory; - AlphaRowset(const TabletSchema* schema, - std::string rowset_path, + AlphaRowset(const TabletSchema* schema, std::string rowset_path, RowsetMetaSharedPtr rowset_meta); - + // init segment groups OLAPStatus init() override; OLAPStatus do_load(bool use_cache) override; - void do_close() override { } + void do_close() override {} // add custom logic when rowset is published void make_visible_extra(Version version, VersionHash version_hash) override; diff --git a/be/src/olap/rowset/alpha_rowset_reader.cpp b/be/src/olap/rowset/alpha_rowset_reader.cpp index 278576a..906eeeb 100644 --- a/be/src/olap/rowset/alpha_rowset_reader.cpp +++ b/be/src/olap/rowset/alpha_rowset_reader.cpp @@ -21,14 +21,15 @@ namespace doris { -AlphaRowsetReader::AlphaRowsetReader( - int num_rows_per_row_block, - AlphaRowsetSharedPtr rowset) - : _num_rows_per_row_block(num_rows_per_row_block), - _rowset(std::move(rowset)), - _alpha_rowset_meta(std::static_pointer_cast<AlphaRowsetMeta>(_rowset->rowset_meta()).get()), - _segment_groups(_rowset->_segment_groups), - _key_range_size(0) { +AlphaRowsetReader::AlphaRowsetReader(int num_rows_per_row_block, AlphaRowsetSharedPtr rowset, + MemTracker* parent_tracker) + : _num_rows_per_row_block(num_rows_per_row_block), + _rowset(std::move(rowset)), + _parent_tracker(parent_tracker), + _alpha_rowset_meta( + std::static_pointer_cast<AlphaRowsetMeta>(_rowset->rowset_meta()).get()), + _segment_groups(_rowset->_segment_groups), + _key_range_size(0) { _rowset->aquire(); } @@ -57,7 +58,7 @@ OLAPStatus AlphaRowsetReader::init(RowsetReaderContext* read_context) { // 2) we have several segment groups (_is_segments_overlapping && _merge_ctxs.size() > 1) if (_current_read_context->need_ordered_result && _is_segments_overlapping && _merge_ctxs.size() > 1) { _next_block = &AlphaRowsetReader::_merge_block; - _read_block.reset(new (std::nothrow) RowBlock(_current_read_context->tablet_schema)); + _read_block.reset(new (std::nothrow) RowBlock(_current_read_context->tablet_schema, _parent_tracker)); if (_read_block == nullptr) { LOG(WARNING) << "new row block failed in reader"; return OLAP_ERR_MALLOC_ERROR; @@ -332,7 +333,7 @@ OLAPStatus AlphaRowsetReader::_init_merge_ctxs(RowsetReaderContext* read_context const bool use_index_stream_cache = read_context->reader_type == READER_QUERY; for (auto& segment_group : _segment_groups) { - std::unique_ptr<ColumnData> new_column_data(ColumnData::create(segment_group.get())); + std::unique_ptr<ColumnData> new_column_data(ColumnData::create(segment_group.get(), _parent_tracker)); OLAPStatus status = new_column_data->init(); if (status != OLAP_SUCCESS) { LOG(WARNING) << "init column data failed"; diff --git a/be/src/olap/rowset/alpha_rowset_reader.h b/be/src/olap/rowset/alpha_rowset_reader.h index 3f3eb58..222523a 100644 --- a/be/src/olap/rowset/alpha_rowset_reader.h +++ b/be/src/olap/rowset/alpha_rowset_reader.h @@ -52,7 +52,8 @@ struct AlphaMergeContextComparator { class AlphaRowsetReader : public RowsetReader { public: - AlphaRowsetReader(int num_rows_per_row_block, AlphaRowsetSharedPtr rowset); + AlphaRowsetReader(int num_rows_per_row_block, AlphaRowsetSharedPtr rowset, + MemTracker* parent_tracker = nullptr); ~AlphaRowsetReader() override; @@ -60,6 +61,8 @@ public: OLAPStatus init(RowsetReaderContext* read_context) override; // read next block data + // If parent_tracker is not null, the block we get from next_block() will have the parent_tracker. + // It's ok, because we only get ref here, the block's owner is this reader. OLAPStatus next_block(RowBlock** block) override; bool delete_flag() override; @@ -101,6 +104,7 @@ private: private: int _num_rows_per_row_block; AlphaRowsetSharedPtr _rowset; + MemTracker* _parent_tracker; std::string _rowset_path; AlphaRowsetMeta* _alpha_rowset_meta; const std::vector<std::shared_ptr<SegmentGroup>>& _segment_groups; diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index 6207034..784872a 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -67,6 +67,12 @@ OLAPStatus BetaRowset::create_reader(RowsetReaderSharedPtr* result) { return OLAP_SUCCESS; } +OLAPStatus BetaRowset::create_reader(MemTracker* parent_tracker, std::shared_ptr<RowsetReader>* result) { + // NOTE: We use std::static_pointer_cast for performance + result->reset(new BetaRowsetReader(std::static_pointer_cast<BetaRowset>(shared_from_this()), parent_tracker)); + return OLAP_SUCCESS; +} + OLAPStatus BetaRowset::split_range(const RowCursor& start_key, const RowCursor& end_key, uint64_t request_block_row_count, diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h index 58ae2e0..6704fd5 100644 --- a/be/src/olap/rowset/beta_rowset.h +++ b/be/src/olap/rowset/beta_rowset.h @@ -39,11 +39,13 @@ public: OLAPStatus create_reader(RowsetReaderSharedPtr* result) override; - static std::string segment_file_path( - const std::string& segment_dir, const RowsetId& rowset_id, int segment_id); + OLAPStatus create_reader(MemTracker* parent_tracker, + std::shared_ptr<RowsetReader>* result) override; - OLAPStatus split_range(const RowCursor& start_key, - const RowCursor& end_key, + static std::string segment_file_path(const std::string& segment_dir, const RowsetId& rowset_id, + int segment_id); + + OLAPStatus split_range(const RowCursor& start_key, const RowCursor& end_key, uint64_t request_block_row_count, std::vector<OlapTuple>* ranges) override; @@ -61,8 +63,7 @@ public: bool check_path(const std::string& path) override; protected: - BetaRowset(const TabletSchema* schema, - std::string rowset_path, + BetaRowset(const TabletSchema* schema, std::string rowset_path, RowsetMetaSharedPtr rowset_meta); // init segment groups diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 148de34..dbc2ea1 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -27,8 +27,8 @@ namespace doris { -BetaRowsetReader::BetaRowsetReader(BetaRowsetSharedPtr rowset) - : _rowset(std::move(rowset)), _stats(&_owned_stats) { +BetaRowsetReader::BetaRowsetReader(BetaRowsetSharedPtr rowset, MemTracker* parent_tracker) + : _rowset(std::move(rowset)), _stats(&_owned_stats), _parent_tracker(parent_tracker) { _rowset->aquire(); } @@ -99,7 +99,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) { _input_block.reset(new RowBlockV2(schema, 1024)); // init output block and row - _output_block.reset(new RowBlock(read_context->tablet_schema)); + _output_block.reset(new RowBlock(read_context->tablet_schema, _parent_tracker)); RowBlockInfo output_block_info; output_block_info.row_num = 1024; output_block_info.null_supported = true; diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h index ae5b687..3683540 100644 --- a/be/src/olap/rowset/beta_rowset_reader.h +++ b/be/src/olap/rowset/beta_rowset_reader.h @@ -18,10 +18,10 @@ #ifndef DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_READER_H #define DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_READER_H +#include "olap/iterators.h" #include "olap/row_block.h" #include "olap/row_block2.h" #include "olap/row_cursor.h" -#include "olap/iterators.h" #include "olap/rowset/beta_rowset.h" #include "olap/rowset/rowset_reader.h" @@ -29,14 +29,14 @@ namespace doris { class BetaRowsetReader : public RowsetReader { public: - explicit BetaRowsetReader(BetaRowsetSharedPtr rowset); + BetaRowsetReader(BetaRowsetSharedPtr rowset, MemTracker* parent_tracker = nullptr); - ~BetaRowsetReader() override { - _rowset->release(); - } + ~BetaRowsetReader() override { _rowset->release(); } OLAPStatus init(RowsetReaderContext* read_context) override; + // If parent_tracker is not null, the block we get from next_block() will have the parent_tracker. + // It's ok, because we only get ref here, the block's owner is this reader. OLAPStatus next_block(RowBlock** block) override; bool delete_flag() override { return _rowset->delete_flag(); } @@ -47,9 +47,7 @@ public: RowsetSharedPtr rowset() override { return std::dynamic_pointer_cast<Rowset>(_rowset); } - int64_t filtered_rows() override { - return _stats->rows_del_filtered; - } + int64_t filtered_rows() override { return _stats->rows_del_filtered; } private: BetaRowsetSharedPtr _rowset; @@ -58,6 +56,8 @@ private: OlapReaderStatistics _owned_stats; OlapReaderStatistics* _stats; + MemTracker* _parent_tracker; + std::unique_ptr<RowwiseIterator> _iterator; std::unique_ptr<RowBlockV2> _input_block; diff --git a/be/src/olap/rowset/column_data.cpp b/be/src/olap/rowset/column_data.cpp index 080564d..e330255 100644 --- a/be/src/olap/rowset/column_data.cpp +++ b/be/src/olap/rowset/column_data.cpp @@ -24,13 +24,14 @@ namespace doris { -ColumnData* ColumnData::create(SegmentGroup* segment_group) { - ColumnData* data = new(std::nothrow) ColumnData(segment_group); +ColumnData* ColumnData::create(SegmentGroup* segment_group, MemTracker* parent_tracker) { + ColumnData* data = new (std::nothrow) ColumnData(segment_group, parent_tracker); return data; } -ColumnData::ColumnData(SegmentGroup* segment_group) +ColumnData::ColumnData(SegmentGroup* segment_group, MemTracker* parent_tracker) : _segment_group(segment_group), + _parent_tracker(parent_tracker), _eof(false), _conditions(nullptr), _col_predicates(nullptr), @@ -135,7 +136,7 @@ OLAPStatus ColumnData::_seek_to_block(const RowBlockPosition& block_pos, bool wi _segment_reader = new(std::nothrow) SegmentReader( file_name, segment_group(), block_pos.segment, _seek_columns, _load_bf_columns, _conditions, - _delete_handler, _delete_status, _lru_cache, _runtime_state, _stats); + _delete_handler, _delete_status, _lru_cache, _runtime_state, _stats, _parent_tracker); if (_segment_reader == nullptr) { OLAP_LOG_WARNING("fail to malloc segment reader."); return OLAP_ERR_MALLOC_ERROR; @@ -435,13 +436,15 @@ void ColumnData::set_read_params( LOG(WARNING) << "fail to init row_cursor"; } - _read_vector_batch.reset(new VectorizedRowBatch( - &(_segment_group->get_tablet_schema()), _return_columns, _num_rows_per_block)); + _read_vector_batch.reset(new VectorizedRowBatch(&(_segment_group->get_tablet_schema()), + _return_columns, _num_rows_per_block, + _parent_tracker)); - _seek_vector_batch.reset(new VectorizedRowBatch( - &(_segment_group->get_tablet_schema()), _seek_columns, _num_rows_per_block)); + _seek_vector_batch.reset(new VectorizedRowBatch(&(_segment_group->get_tablet_schema()), + _seek_columns, _num_rows_per_block, + _parent_tracker)); - _read_block.reset(new RowBlock(&(_segment_group->get_tablet_schema()))); + _read_block.reset(new RowBlock(&(_segment_group->get_tablet_schema()), _parent_tracker)); RowBlockInfo block_info; block_info.row_num = _num_rows_per_block; block_info.null_supported = true; @@ -578,7 +581,7 @@ OLAPStatus ColumnData::schema_change_init() { _read_vector_batch.reset(new VectorizedRowBatch( &(_segment_group->get_tablet_schema()), _return_columns, _num_rows_per_block)); - _read_block.reset(new RowBlock(&(_segment_group->get_tablet_schema()))); + _read_block.reset(new RowBlock(&(_segment_group->get_tablet_schema()), _parent_tracker)); RowBlockInfo block_info; block_info.row_num = _num_rows_per_block; diff --git a/be/src/olap/rowset/column_data.h b/be/src/olap/rowset/column_data.h index 06b9a8c..3363a6c 100644 --- a/be/src/olap/rowset/column_data.h +++ b/be/src/olap/rowset/column_data.h @@ -40,8 +40,8 @@ class SegmentReader; // This class is column data reader. this class will be used in two case. class ColumnData { public: - static ColumnData* create(SegmentGroup* segment_group); - explicit ColumnData(SegmentGroup* segment_group); + static ColumnData* create(SegmentGroup* segment_group, MemTracker* parent_tracker = nullptr); + ColumnData(SegmentGroup* segment_group, MemTracker* parent_tracker = nullptr); ~ColumnData(); // 为了与之前兼容, 暴露部分index的接口 @@ -155,6 +155,7 @@ private: } private: SegmentGroup* _segment_group; + MemTracker* _parent_tracker; // 当到达文件末尾或者到达end key时设置此标志 bool _eof; const Conditions* _conditions; diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 2c509eb..07a1b9d 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -29,6 +29,7 @@ namespace doris { class DataDir; +class MemTracker; class OlapTuple; class RowCursor; class Rowset; @@ -118,6 +119,10 @@ public: // returns OLAP_ERR_ROWSET_CREATE_READER when failed to create reader virtual OLAPStatus create_reader(std::shared_ptr<RowsetReader>* result) = 0; + // Support adding parent tracker, but should be careful about destruction sequence. + virtual OLAPStatus create_reader(MemTracker* parent_tracker, + std::shared_ptr<RowsetReader>* result) = 0; + // Split range denoted by `start_key` and `end_key` into sub-ranges, each contains roughly // `request_block_row_count` rows. Sub-range is represented by pair of OlapTuples and added to `ranges`. // diff --git a/be/src/olap/rowset/segment_reader.cpp b/be/src/olap/rowset/segment_reader.cpp index b6dddbf..4dcf7cd 100644 --- a/be/src/olap/rowset/segment_reader.cpp +++ b/be/src/olap/rowset/segment_reader.cpp @@ -32,42 +32,37 @@ namespace doris { static const uint32_t MIN_FILTER_BLOCK_NUM = 10; -SegmentReader::SegmentReader( - const std::string file, - SegmentGroup* segment_group, - uint32_t segment_id, - const std::vector<uint32_t>& used_columns, - const std::set<uint32_t>& load_bf_columns, - const Conditions* conditions, - const DeleteHandler* delete_handler, - const DelCondSatisfied delete_status, - Cache* lru_cache, - RuntimeState* runtime_state, - OlapReaderStatistics* stats) : - _file_name(file), - _segment_group(segment_group), - _segment_id(segment_id), - _used_columns(used_columns), - _load_bf_columns(load_bf_columns), - _conditions(conditions), - _delete_handler(delete_handler), - _delete_status(delete_status), - _eof(false), - _end_block(-1), - // 确保第一次调用_move_to_next_row,会执行seek_to_block - _block_count(0), - _num_rows_in_block(0), - _null_supported(false), - _mmap_buffer(NULL), - _include_blocks(NULL), - _is_using_mmap(false), - _is_data_loaded(false), - _buffer_size(0), - _shared_buffer(NULL), - _lru_cache(lru_cache), - _runtime_state(runtime_state), - _stats(stats) { - _tracker.reset(new MemTracker(-1)); +SegmentReader::SegmentReader(const std::string file, SegmentGroup* segment_group, + uint32_t segment_id, const std::vector<uint32_t>& used_columns, + const std::set<uint32_t>& load_bf_columns, + const Conditions* conditions, const DeleteHandler* delete_handler, + const DelCondSatisfied delete_status, Cache* lru_cache, + RuntimeState* runtime_state, OlapReaderStatistics* stats, + MemTracker* parent_tracker) + : _file_name(file), + _segment_group(segment_group), + _segment_id(segment_id), + _used_columns(used_columns), + _load_bf_columns(load_bf_columns), + _conditions(conditions), + _delete_handler(delete_handler), + _delete_status(delete_status), + _eof(false), + _end_block(-1), + // 确保第一次调用_move_to_next_row,会执行seek_to_block + _block_count(0), + _num_rows_in_block(0), + _null_supported(false), + _mmap_buffer(NULL), + _include_blocks(NULL), + _is_using_mmap(false), + _is_data_loaded(false), + _buffer_size(0), + _shared_buffer(NULL), + _lru_cache(lru_cache), + _runtime_state(runtime_state), + _stats(stats) { + _tracker.reset(new MemTracker(-1, "SegmentReader", parent_tracker, true)); _mem_pool.reset(new MemPool(_tracker.get())); } diff --git a/be/src/olap/rowset/segment_reader.h b/be/src/olap/rowset/segment_reader.h index 542fbe7..081ed8b 100644 --- a/be/src/olap/rowset/segment_reader.h +++ b/be/src/olap/rowset/segment_reader.h @@ -48,17 +48,12 @@ namespace doris { // SegmentReader 用于读取一个Segment文件 class SegmentReader { public: - explicit SegmentReader(const std::string file, - SegmentGroup* segment_group, - uint32_t segment_id, - const std::vector<uint32_t>& used_columns, - const std::set<uint32_t>& load_bf_columns, - const Conditions* conditions, - const DeleteHandler* delete_handler, - const DelCondSatisfied delete_status, - Cache* lru_cache, - RuntimeState* runtime_state, - OlapReaderStatistics* stats); + SegmentReader(const std::string file, SegmentGroup* segment_group, uint32_t segment_id, + const std::vector<uint32_t>& used_columns, + const std::set<uint32_t>& load_bf_columns, const Conditions* conditions, + const DeleteHandler* delete_handler, const DelCondSatisfied delete_status, + Cache* lru_cache, RuntimeState* runtime_state, OlapReaderStatistics* stats, + MemTracker* parent_tracker = nullptr); ~SegmentReader(); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index dffc0de..09ddb09 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -18,6 +18,7 @@ #include "olap/storage_engine.h" #include <signal.h> +#include <sys/syscall.h> #include <algorithm> #include <cstdio> @@ -103,18 +104,19 @@ Status StorageEngine::open(const EngineOptions& options, StorageEngine** engine_ StorageEngine::StorageEngine(const EngineOptions& options) : _options(options), - _available_storage_medium_type_count(0), - _effective_cluster_id(-1), - _is_all_cluster_id_exist(true), - _index_stream_lru_cache(NULL), - _file_cache(nullptr), - _tablet_manager(new TabletManager(config::tablet_map_shard_size)), - _txn_manager(new TxnManager(config::txn_map_shard_size, config::txn_shard_size)), - _rowset_id_generator(new UniqueRowsetIdGenerator(options.backend_uid)), - _memtable_flush_executor(nullptr), - _block_manager(nullptr), - _default_rowset_type(ALPHA_ROWSET), - _heartbeat_flags(nullptr) { + _available_storage_medium_type_count(0), + _effective_cluster_id(-1), + _is_all_cluster_id_exist(true), + _index_stream_lru_cache(NULL), + _file_cache(nullptr), + _compaction_mem_tracker(-1, "compaction mem tracker(unlimited)"), + _tablet_manager(new TabletManager(config::tablet_map_shard_size)), + _txn_manager(new TxnManager(config::txn_map_shard_size, config::txn_shard_size)), + _rowset_id_generator(new UniqueRowsetIdGenerator(options.backend_uid)), + _memtable_flush_executor(nullptr), + _block_manager(nullptr), + _default_rowset_type(ALPHA_ROWSET), + _heartbeat_flags(nullptr) { if (_s_instance == nullptr) { _s_instance = this; } @@ -122,6 +124,11 @@ StorageEngine::StorageEngine(const EngineOptions& options) MutexLock lock(&_gc_mutex); return _unused_rowsets.size(); }); + REGISTER_GAUGE_DORIS_METRIC(compaction_mem_current_consumption, [this]() { + return _compaction_mem_tracker.consumption(); + // We can get each compaction's detail usage + LOG(INFO) << _compaction_mem_tracker.LogUsage(2); + }); } StorageEngine::~StorageEngine() { @@ -530,7 +537,9 @@ void StorageEngine::_perform_cumulative_compaction(DataDir* data_dir) { TRACE("found best tablet $0", best_tablet->get_tablet_info().tablet_id); DorisMetrics::instance()->cumulative_compaction_request_total.increment(1); - CumulativeCompaction cumulative_compaction(best_tablet); + + std::string tracker_label = "cumulative compaction " + std::to_string(syscall(__NR_gettid)); + CumulativeCompaction cumulative_compaction(best_tablet, tracker_label, &_compaction_mem_tracker); OLAPStatus res = cumulative_compaction.compact(); if (res != OLAP_SUCCESS) { @@ -564,7 +573,9 @@ void StorageEngine::_perform_base_compaction(DataDir* data_dir) { TRACE("found best tablet $0", best_tablet->get_tablet_info().tablet_id); DorisMetrics::instance()->base_compaction_request_total.increment(1); - BaseCompaction base_compaction(best_tablet); + + std::string tracker_label = "base compaction " + std::to_string(syscall(__NR_gettid)); + BaseCompaction base_compaction(best_tablet, tracker_label, &_compaction_mem_tracker); OLAPStatus res = base_compaction.compact(); if (res != OLAP_SUCCESS) { best_tablet->set_last_base_compaction_failure_time(UnixMillis()); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 2bc0c75..d8d97dd 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -316,6 +316,8 @@ private: // map<rowset_id(str), RowsetSharedPtr>, if we use RowsetId as the key, we need custom hash func std::unordered_map<std::string, RowsetSharedPtr> _unused_rowsets; + MemTracker _compaction_mem_tracker; + bool _stop_bg_worker = false; std::thread _unused_rowset_monitor_thread; // thread to monitor snapshot expiry diff --git a/be/src/runtime/initial_reservations.cc b/be/src/runtime/initial_reservations.cc index 9c2bd7f..1a279f4 100644 --- a/be/src/runtime/initial_reservations.cc +++ b/be/src/runtime/initial_reservations.cc @@ -40,7 +40,7 @@ InitialReservations::InitialReservations(ObjectPool* obj_pool, ReservationTracker* query_reservation, MemTracker* query_mem_tracker, int64_t initial_reservation_total_claims) : initial_reservation_mem_tracker_(obj_pool->add( - new MemTracker(-1, "Unclaimed reservations", query_mem_tracker, false))), + new MemTracker(-1, "Unclaimed reservations", query_mem_tracker, false, false))), remaining_initial_reservation_claims_(initial_reservation_total_claims) { initial_reservations_.InitChildTracker(nullptr, query_reservation, initial_reservation_mem_tracker_, numeric_limits<int64_t>::max()); diff --git a/be/src/runtime/vectorized_row_batch.cpp b/be/src/runtime/vectorized_row_batch.cpp index ae371e7..60b309c 100644 --- a/be/src/runtime/vectorized_row_batch.cpp +++ b/be/src/runtime/vectorized_row_batch.cpp @@ -22,15 +22,14 @@ namespace doris { -VectorizedRowBatch::VectorizedRowBatch( - const TabletSchema* schema, - const std::vector<uint32_t>& cols, - int capacity) - : _schema(schema), _cols(cols), _capacity(capacity), _limit(capacity) { +VectorizedRowBatch::VectorizedRowBatch(const TabletSchema* schema, + const std::vector<uint32_t>& cols, int capacity, + MemTracker* parent_tracker) + : _schema(schema), _cols(cols), _capacity(capacity), _limit(capacity) { _selected_in_use = false; _size = 0; - _tracker.reset(new MemTracker(-1)); + _tracker.reset(new MemTracker(-1, "VectorizedRowBatch", parent_tracker, true)); _mem_pool.reset(new MemPool(_tracker.get())); _selected = reinterpret_cast<uint16_t*>(new char[sizeof(uint16_t) * _capacity]); diff --git a/be/src/runtime/vectorized_row_batch.h b/be/src/runtime/vectorized_row_batch.h index d4959a6..aef23ae 100644 --- a/be/src/runtime/vectorized_row_batch.h +++ b/be/src/runtime/vectorized_row_batch.h @@ -72,10 +72,8 @@ private: class VectorizedRowBatch { public: - VectorizedRowBatch( - const TabletSchema* schema, - const std::vector<uint32_t>& cols, - int capacity); + VectorizedRowBatch(const TabletSchema* schema, const std::vector<uint32_t>& cols, int capacity, + MemTracker* parent_tracker = nullptr); ~VectorizedRowBatch() { for (auto vec: _col_vectors) { diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 1dc7fc9..229b764 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -20,8 +20,8 @@ #include <set> #include <string> -#include <vector> #include <unordered_map> +#include <vector> #include "util/metrics.h" #include "util/system_metrics.h" @@ -46,11 +46,10 @@ private: std::unordered_map<std::string, std::unique_ptr<IntGauge>> metrics; }; -#define REGISTER_GAUGE_DORIS_METRIC(name, func) \ - DorisMetrics::instance()->metrics()->register_metric(#name, &DorisMetrics::instance()->name); \ - DorisMetrics::instance()->metrics()->register_hook(#name, [&]() { \ - DorisMetrics::instance()->name.set_value(func()); \ -}); +#define REGISTER_GAUGE_DORIS_METRIC(name, func) \ + DorisMetrics::instance()->metrics()->register_metric(#name, &DorisMetrics::instance()->name); \ + DorisMetrics::instance()->metrics()->register_hook( \ + #name, [&]() { DorisMetrics::instance()->name.set_value(func()); }); class DorisMetrics { public: @@ -185,6 +184,7 @@ public: METRIC_DEFINE_UINT_GAUGE(stream_load_pipe_count, MetricUnit::NOUNIT); METRIC_DEFINE_UINT_GAUGE(brpc_endpoint_stub_count, MetricUnit::NOUNIT); METRIC_DEFINE_UINT_GAUGE(tablet_writer_count, MetricUnit::NOUNIT); + METRIC_DEFINE_UINT_GAUGE(compaction_mem_current_consumption, MetricUnit::BYTES); static DorisMetrics* instance() { static DorisMetrics instance; @@ -193,10 +193,10 @@ public: // not thread-safe, call before calling metrics void initialize( - const std::vector<std::string>& paths = std::vector<std::string>(), - bool init_system_metrics = false, - const std::set<std::string>& disk_devices = std::set<std::string>(), - const std::vector<std::string>& network_interfaces = std::vector<std::string>()); + const std::vector<std::string>& paths = std::vector<std::string>(), + bool init_system_metrics = false, + const std::set<std::string>& disk_devices = std::set<std::string>(), + const std::vector<std::string>& network_interfaces = std::vector<std::string>()); MetricRegistry* metrics() { return &_metrics; } SystemMetrics* system_metrics() { return &_system_metrics; } @@ -217,6 +217,6 @@ private: SystemMetrics _system_metrics; }; -}; +}; // namespace doris #endif diff --git a/docs/en/administrator-guide/operation/monitor-metrics/be-metrics.md b/docs/en/administrator-guide/operation/monitor-metrics/be-metrics.md index a2716a7..4fd45d4 100644 --- a/docs/en/administrator-guide/operation/monitor-metrics/be-metrics.md +++ b/docs/en/administrator-guide/operation/monitor-metrics/be-metrics.md @@ -75,3 +75,10 @@ Value of the `Tcp: OutSegs` field in `/proc/net/snmp`. Represents the number of Use `(NEW_tcp_retrans_segs - OLD_tcp_retrans_segs) / (NEW_tcp_out_segs - OLD_tcp_out_segs)` can calculate the retrans rate of TCP packets. Usually used to troubleshoot network problems. + +### `doris_be_compaction_mem_current_consumption` + +The total MemPool consumption of all running `Compaction` threads. Use this value, we can easily identify whether +Compactions use too much memory, it may cause memory overhead or OOM. + +Usually used to troubleshoot memory problems. \ No newline at end of file diff --git a/docs/zh-CN/administrator-guide/operation/monitor-metrics/be-metrics.md b/docs/zh-CN/administrator-guide/operation/monitor-metrics/be-metrics.md index 41533b8..db555ef 100644 --- a/docs/zh-CN/administrator-guide/operation/monitor-metrics/be-metrics.md +++ b/docs/zh-CN/administrator-guide/operation/monitor-metrics/be-metrics.md @@ -75,3 +75,10 @@ BE 的监控项可以通过以下方式访问: 通过 `(NEW_tcp_tcp_retrans_segs - OLD_tcp_retrans_segs) / (NEW_tcp_out_segs - OLD_tcp_out_segs)` 可以计算 TCP 重传率。 通常用于排查网络问题。 + +### `doris_be_compaction_mem_current_consumption` + +该监控项为Compaction使用的MemPool总和(所有Compaction线程)。通过该值,可以迅速判断Compaction是否占用过多内存,引起高内存占用 +甚至OOM等问题。 + +通常用于排查内存使用问题。 \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org