This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new a01d1ae [Compaction] track RowsetReader's mem & add metric (#4068)
a01d1ae is described below
commit a01d1aec568247c681e26d3815421996f0c62544
Author: HuangWei <[email protected]>
AuthorDate: Fri Jul 24 07:58:09 2020 +0800
[Compaction] track RowsetReader's mem & add metric (#4068)
Ref
https://github.com/apache/incubator-doris/issues/3624#issuecomment-655933244
Only RowsetReaders in compaction are under the track.
Other RowsetReaders won't be effected, because the parent_tracker is
nullptr.
---
be/src/olap/base_compaction.cpp | 6 +-
be/src/olap/base_compaction.h | 2 +-
be/src/olap/compaction.cpp | 17 +++---
be/src/olap/compaction.h | 7 ++-
be/src/olap/cumulative_compaction.cpp | 8 +--
be/src/olap/cumulative_compaction.h | 3 +-
be/src/olap/row_block.cpp | 4 +-
be/src/olap/row_block.h | 2 +-
be/src/olap/rowset/alpha_rowset.cpp | 12 +++-
be/src/olap/rowset/alpha_rowset.h | 27 ++++-----
be/src/olap/rowset/alpha_rowset_reader.cpp | 21 +++----
be/src/olap/rowset/alpha_rowset_reader.h | 6 +-
be/src/olap/rowset/beta_rowset.cpp | 6 ++
be/src/olap/rowset/beta_rowset.h | 13 +++--
be/src/olap/rowset/beta_rowset_reader.cpp | 6 +-
be/src/olap/rowset/beta_rowset_reader.h | 16 +++---
be/src/olap/rowset/column_data.cpp | 23 ++++----
be/src/olap/rowset/column_data.h | 5 +-
be/src/olap/rowset/rowset.h | 5 ++
be/src/olap/rowset/segment_reader.cpp | 67 ++++++++++------------
be/src/olap/rowset/segment_reader.h | 17 ++----
be/src/olap/storage_engine.cpp | 39 ++++++++-----
be/src/olap/storage_engine.h | 2 +
be/src/runtime/initial_reservations.cc | 2 +-
be/src/runtime/vectorized_row_batch.cpp | 11 ++--
be/src/runtime/vectorized_row_batch.h | 6 +-
be/src/util/doris_metrics.h | 22 +++----
.../operation/monitor-metrics/be-metrics.md | 7 +++
.../operation/monitor-metrics/be-metrics.md | 7 +++
29 files changed, 211 insertions(+), 158 deletions(-)
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index e155335..0ccb4ca 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -21,9 +21,9 @@
namespace doris {
-BaseCompaction::BaseCompaction(TabletSharedPtr tablet)
- : Compaction(tablet)
-{ }
+BaseCompaction::BaseCompaction(TabletSharedPtr tablet, const std::string&
label,
+ MemTracker* parent_tracker)
+ : Compaction(tablet, label, parent_tracker) {}
BaseCompaction::~BaseCompaction() { }
diff --git a/be/src/olap/base_compaction.h b/be/src/olap/base_compaction.h
index ff42aa5..9ea54a9 100644
--- a/be/src/olap/base_compaction.h
+++ b/be/src/olap/base_compaction.h
@@ -29,7 +29,7 @@ namespace doris {
class BaseCompaction : public Compaction {
public:
- BaseCompaction(TabletSharedPtr tablet);
+ BaseCompaction(TabletSharedPtr tablet, const std::string& label,
MemTracker* parent_tracker);
~BaseCompaction() override;
OLAPStatus compact() override;
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index adb3b6a..5a74671 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -27,12 +27,13 @@ namespace doris {
Semaphore Compaction::_concurrency_sem;
-Compaction::Compaction(TabletSharedPtr tablet)
- : _tablet(tablet),
- _input_rowsets_size(0),
- _input_row_num(0),
- _state(CompactionState::INITED)
-{ }
+Compaction::Compaction(TabletSharedPtr tablet, const std::string& label,
MemTracker* parent_tracker)
+ : _mem_tracker(-1, label, parent_tracker, true),
+ _readers_tracker(-1, "readers tracker", &_mem_tracker, true),
+ _tablet(tablet),
+ _input_rowsets_size(0),
+ _input_row_num(0),
+ _state(CompactionState::INITED) {}
Compaction::~Compaction() {}
@@ -74,6 +75,7 @@ OLAPStatus Compaction::do_compaction_impl() {
TRACE("prepare finished");
// 2. write merged rows to output rowset
+ // The test results show that merger is low-memory-footprint, there is no
need to tracker its mem pool
Merger::Statistics stats;
auto res = Merger::merge_rowsets(_tablet, compaction_type(),
_input_rs_readers, _output_rs_writer.get(), &stats);
if (res != OLAP_SUCCESS) {
@@ -143,6 +145,7 @@ OLAPStatus Compaction::construct_output_rowset_writer() {
context.version = _output_version;
context.version_hash = _output_version_hash;
context.segments_overlap = NONOVERLAPPING;
+ // The test results show that one rs writer is low-memory-footprint, there
is no need to tracker its mem pool
RETURN_NOT_OK(RowsetFactory::create_rowset_writer(context,
&_output_rs_writer));
return OLAP_SUCCESS;
}
@@ -150,7 +153,7 @@ OLAPStatus Compaction::construct_output_rowset_writer() {
OLAPStatus Compaction::construct_input_rowset_readers() {
for (auto& rowset : _input_rowsets) {
RowsetReaderSharedPtr rs_reader;
- RETURN_NOT_OK(rowset->create_reader(&rs_reader));
+ RETURN_NOT_OK(rowset->create_reader(&_readers_tracker, &rs_reader));
_input_rs_readers.push_back(std::move(rs_reader));
}
return OLAP_SUCCESS;
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 9837ac5..9507cc2 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -44,7 +44,7 @@ class Merger;
// 4. gc unused rowstes
class Compaction {
public:
- Compaction(TabletSharedPtr tablet);
+ Compaction(TabletSharedPtr tablet, const std::string& label, MemTracker*
parent_tracker);
virtual ~Compaction();
virtual OLAPStatus compact() = 0;
@@ -77,6 +77,11 @@ private:
int64_t _get_input_num_rows_from_seg_grps();
protected:
+ // the root tracker for this compaction
+ MemTracker _mem_tracker;
+
+ // the child of root, only track rowset readers mem
+ MemTracker _readers_tracker;
TabletSharedPtr _tablet;
std::vector<RowsetSharedPtr> _input_rowsets;
diff --git a/be/src/olap/cumulative_compaction.cpp
b/be/src/olap/cumulative_compaction.cpp
index dddf02c..3494561 100755
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -22,10 +22,10 @@
namespace doris {
-CumulativeCompaction::CumulativeCompaction(TabletSharedPtr tablet)
- : Compaction(tablet),
-
_cumulative_rowset_size_threshold(config::cumulative_compaction_budgeted_bytes)
-{ }
+CumulativeCompaction::CumulativeCompaction(TabletSharedPtr tablet, const
std::string& label,
+ MemTracker* parent_tracker)
+ : Compaction(tablet, label, parent_tracker),
+
_cumulative_rowset_size_threshold(config::cumulative_compaction_budgeted_bytes)
{}
CumulativeCompaction::~CumulativeCompaction() { }
diff --git a/be/src/olap/cumulative_compaction.h
b/be/src/olap/cumulative_compaction.h
index 08b650e..49e5f2c 100755
--- a/be/src/olap/cumulative_compaction.h
+++ b/be/src/olap/cumulative_compaction.h
@@ -26,7 +26,8 @@ namespace doris {
class CumulativeCompaction : public Compaction {
public:
- CumulativeCompaction(TabletSharedPtr tablet);
+ CumulativeCompaction(TabletSharedPtr tablet, const std::string& label,
+ MemTracker* parent_tracker);
~CumulativeCompaction() override;
OLAPStatus compact() override;
diff --git a/be/src/olap/row_block.cpp b/be/src/olap/row_block.cpp
index aa00e59..09cf48b 100644
--- a/be/src/olap/row_block.cpp
+++ b/be/src/olap/row_block.cpp
@@ -37,10 +37,10 @@ using std::vector;
namespace doris {
-RowBlock::RowBlock(const TabletSchema* schema) :
+RowBlock::RowBlock(const TabletSchema* schema, MemTracker* parent_tracker) :
_capacity(0),
_schema(schema) {
- _tracker.reset(new MemTracker(-1));
+ _tracker.reset(new MemTracker(-1, "RowBlock", parent_tracker, true));
_mem_pool.reset(new MemPool(_tracker.get()));
}
diff --git a/be/src/olap/row_block.h b/be/src/olap/row_block.h
index cb99128..c9d277f 100644
--- a/be/src/olap/row_block.h
+++ b/be/src/olap/row_block.h
@@ -56,7 +56,7 @@ class RowBlock {
friend class RowBlockChanger;
friend class VectorizedRowBatch;
public:
- RowBlock(const TabletSchema* schema);
+ RowBlock(const TabletSchema* schema, MemTracker* parent_tracker = nullptr);
// 注意回收内部buffer
~RowBlock();
diff --git a/be/src/olap/rowset/alpha_rowset.cpp
b/be/src/olap/rowset/alpha_rowset.cpp
index 8292913..337a049 100644
--- a/be/src/olap/rowset/alpha_rowset.cpp
+++ b/be/src/olap/rowset/alpha_rowset.cpp
@@ -52,8 +52,16 @@ OLAPStatus AlphaRowset::do_load(bool use_cache) {
}
OLAPStatus AlphaRowset::create_reader(std::shared_ptr<RowsetReader>* result) {
- result->reset(new AlphaRowsetReader(
- _schema->num_rows_per_row_block(),
std::static_pointer_cast<AlphaRowset>(shared_from_this())));
+ result->reset(new AlphaRowsetReader(_schema->num_rows_per_row_block(),
+
std::static_pointer_cast<AlphaRowset>(shared_from_this())));
+ return OLAP_SUCCESS;
+}
+
+OLAPStatus AlphaRowset::create_reader(MemTracker* parent_tracker,
+ std::shared_ptr<RowsetReader>* result) {
+ result->reset(new AlphaRowsetReader(_schema->num_rows_per_row_block(),
+
std::static_pointer_cast<AlphaRowset>(shared_from_this()),
+ parent_tracker));
return OLAP_SUCCESS;
}
diff --git a/be/src/olap/rowset/alpha_rowset.h
b/be/src/olap/rowset/alpha_rowset.h
index 1d4f8a9..bff9618 100644
--- a/be/src/olap/rowset/alpha_rowset.h
+++ b/be/src/olap/rowset/alpha_rowset.h
@@ -18,14 +18,14 @@
#ifndef DORIS_BE_SRC_OLAP_ROWSET_ALPHA_ROWSET_H
#define DORIS_BE_SRC_OLAP_ROWSET_ALPHA_ROWSET_H
+#include "olap/data_dir.h"
#include "olap/rowset/rowset.h"
-#include "olap/rowset/segment_group.h"
#include "olap/rowset/rowset_meta.h"
-#include "olap/data_dir.h"
+#include "olap/rowset/segment_group.h"
#include "olap/tuple.h"
-#include <vector>
#include <memory>
+#include <vector>
namespace doris {
@@ -42,8 +42,10 @@ public:
OLAPStatus create_reader(std::shared_ptr<RowsetReader>* result) override;
- OLAPStatus split_range(const RowCursor& start_key,
- const RowCursor& end_key,
+ OLAPStatus create_reader(MemTracker* parent_tracker,
+ std::shared_ptr<RowsetReader>* result) override;
+
+ OLAPStatus split_range(const RowCursor& start_key, const RowCursor&
end_key,
uint64_t request_block_row_count,
std::vector<OlapTuple>* ranges) override;
@@ -54,13 +56,13 @@ public:
OLAPStatus copy_files_to(const std::string& dir) override;
OLAPStatus convert_from_old_files(const std::string& snapshot_path,
- std::vector<std::string>* success_files);
+ std::vector<std::string>* success_files);
- OLAPStatus convert_to_old_files(const std::string& snapshot_path,
- std::vector<std::string>* success_files);
+ OLAPStatus convert_to_old_files(const std::string& snapshot_path,
+ std::vector<std::string>* success_files);
OLAPStatus remove_old_files(std::vector<std::string>* files_to_remove)
override;
-
+
bool check_path(const std::string& path) override;
// when convert from old be, should set row num, index size, data size
@@ -70,16 +72,15 @@ public:
protected:
friend class RowsetFactory;
- AlphaRowset(const TabletSchema* schema,
- std::string rowset_path,
+ AlphaRowset(const TabletSchema* schema, std::string rowset_path,
RowsetMetaSharedPtr rowset_meta);
-
+
// init segment groups
OLAPStatus init() override;
OLAPStatus do_load(bool use_cache) override;
- void do_close() override { }
+ void do_close() override {}
// add custom logic when rowset is published
void make_visible_extra(Version version, VersionHash version_hash)
override;
diff --git a/be/src/olap/rowset/alpha_rowset_reader.cpp
b/be/src/olap/rowset/alpha_rowset_reader.cpp
index 278576a..906eeeb 100644
--- a/be/src/olap/rowset/alpha_rowset_reader.cpp
+++ b/be/src/olap/rowset/alpha_rowset_reader.cpp
@@ -21,14 +21,15 @@
namespace doris {
-AlphaRowsetReader::AlphaRowsetReader(
- int num_rows_per_row_block,
- AlphaRowsetSharedPtr rowset)
- : _num_rows_per_row_block(num_rows_per_row_block),
- _rowset(std::move(rowset)),
-
_alpha_rowset_meta(std::static_pointer_cast<AlphaRowsetMeta>(_rowset->rowset_meta()).get()),
- _segment_groups(_rowset->_segment_groups),
- _key_range_size(0) {
+AlphaRowsetReader::AlphaRowsetReader(int num_rows_per_row_block,
AlphaRowsetSharedPtr rowset,
+ MemTracker* parent_tracker)
+ : _num_rows_per_row_block(num_rows_per_row_block),
+ _rowset(std::move(rowset)),
+ _parent_tracker(parent_tracker),
+ _alpha_rowset_meta(
+
std::static_pointer_cast<AlphaRowsetMeta>(_rowset->rowset_meta()).get()),
+ _segment_groups(_rowset->_segment_groups),
+ _key_range_size(0) {
_rowset->aquire();
}
@@ -57,7 +58,7 @@ OLAPStatus AlphaRowsetReader::init(RowsetReaderContext*
read_context) {
// 2) we have several segment groups (_is_segments_overlapping &&
_merge_ctxs.size() > 1)
if (_current_read_context->need_ordered_result && _is_segments_overlapping
&& _merge_ctxs.size() > 1) {
_next_block = &AlphaRowsetReader::_merge_block;
- _read_block.reset(new (std::nothrow)
RowBlock(_current_read_context->tablet_schema));
+ _read_block.reset(new (std::nothrow)
RowBlock(_current_read_context->tablet_schema, _parent_tracker));
if (_read_block == nullptr) {
LOG(WARNING) << "new row block failed in reader";
return OLAP_ERR_MALLOC_ERROR;
@@ -332,7 +333,7 @@ OLAPStatus
AlphaRowsetReader::_init_merge_ctxs(RowsetReaderContext* read_context
const bool use_index_stream_cache = read_context->reader_type ==
READER_QUERY;
for (auto& segment_group : _segment_groups) {
- std::unique_ptr<ColumnData>
new_column_data(ColumnData::create(segment_group.get()));
+ std::unique_ptr<ColumnData>
new_column_data(ColumnData::create(segment_group.get(), _parent_tracker));
OLAPStatus status = new_column_data->init();
if (status != OLAP_SUCCESS) {
LOG(WARNING) << "init column data failed";
diff --git a/be/src/olap/rowset/alpha_rowset_reader.h
b/be/src/olap/rowset/alpha_rowset_reader.h
index 3f3eb58..222523a 100644
--- a/be/src/olap/rowset/alpha_rowset_reader.h
+++ b/be/src/olap/rowset/alpha_rowset_reader.h
@@ -52,7 +52,8 @@ struct AlphaMergeContextComparator {
class AlphaRowsetReader : public RowsetReader {
public:
- AlphaRowsetReader(int num_rows_per_row_block, AlphaRowsetSharedPtr rowset);
+ AlphaRowsetReader(int num_rows_per_row_block, AlphaRowsetSharedPtr rowset,
+ MemTracker* parent_tracker = nullptr);
~AlphaRowsetReader() override;
@@ -60,6 +61,8 @@ public:
OLAPStatus init(RowsetReaderContext* read_context) override;
// read next block data
+ // If parent_tracker is not null, the block we get from next_block() will
have the parent_tracker.
+ // It's ok, because we only get ref here, the block's owner is this reader.
OLAPStatus next_block(RowBlock** block) override;
bool delete_flag() override;
@@ -101,6 +104,7 @@ private:
private:
int _num_rows_per_row_block;
AlphaRowsetSharedPtr _rowset;
+ MemTracker* _parent_tracker;
std::string _rowset_path;
AlphaRowsetMeta* _alpha_rowset_meta;
const std::vector<std::shared_ptr<SegmentGroup>>& _segment_groups;
diff --git a/be/src/olap/rowset/beta_rowset.cpp
b/be/src/olap/rowset/beta_rowset.cpp
index 6207034..784872a 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -67,6 +67,12 @@ OLAPStatus BetaRowset::create_reader(RowsetReaderSharedPtr*
result) {
return OLAP_SUCCESS;
}
+OLAPStatus BetaRowset::create_reader(MemTracker* parent_tracker,
std::shared_ptr<RowsetReader>* result) {
+ // NOTE: We use std::static_pointer_cast for performance
+ result->reset(new
BetaRowsetReader(std::static_pointer_cast<BetaRowset>(shared_from_this()),
parent_tracker));
+ return OLAP_SUCCESS;
+}
+
OLAPStatus BetaRowset::split_range(const RowCursor& start_key,
const RowCursor& end_key,
uint64_t request_block_row_count,
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index 58ae2e0..6704fd5 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -39,11 +39,13 @@ public:
OLAPStatus create_reader(RowsetReaderSharedPtr* result) override;
- static std::string segment_file_path(
- const std::string& segment_dir, const RowsetId& rowset_id, int
segment_id);
+ OLAPStatus create_reader(MemTracker* parent_tracker,
+ std::shared_ptr<RowsetReader>* result) override;
- OLAPStatus split_range(const RowCursor& start_key,
- const RowCursor& end_key,
+ static std::string segment_file_path(const std::string& segment_dir, const
RowsetId& rowset_id,
+ int segment_id);
+
+ OLAPStatus split_range(const RowCursor& start_key, const RowCursor&
end_key,
uint64_t request_block_row_count,
std::vector<OlapTuple>* ranges) override;
@@ -61,8 +63,7 @@ public:
bool check_path(const std::string& path) override;
protected:
- BetaRowset(const TabletSchema* schema,
- std::string rowset_path,
+ BetaRowset(const TabletSchema* schema, std::string rowset_path,
RowsetMetaSharedPtr rowset_meta);
// init segment groups
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 148de34..dbc2ea1 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -27,8 +27,8 @@
namespace doris {
-BetaRowsetReader::BetaRowsetReader(BetaRowsetSharedPtr rowset)
- : _rowset(std::move(rowset)), _stats(&_owned_stats) {
+BetaRowsetReader::BetaRowsetReader(BetaRowsetSharedPtr rowset, MemTracker*
parent_tracker)
+ : _rowset(std::move(rowset)), _stats(&_owned_stats),
_parent_tracker(parent_tracker) {
_rowset->aquire();
}
@@ -99,7 +99,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext*
read_context) {
_input_block.reset(new RowBlockV2(schema, 1024));
// init output block and row
- _output_block.reset(new RowBlock(read_context->tablet_schema));
+ _output_block.reset(new RowBlock(read_context->tablet_schema,
_parent_tracker));
RowBlockInfo output_block_info;
output_block_info.row_num = 1024;
output_block_info.null_supported = true;
diff --git a/be/src/olap/rowset/beta_rowset_reader.h
b/be/src/olap/rowset/beta_rowset_reader.h
index ae5b687..3683540 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -18,10 +18,10 @@
#ifndef DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_READER_H
#define DORIS_BE_SRC_OLAP_ROWSET_BETA_ROWSET_READER_H
+#include "olap/iterators.h"
#include "olap/row_block.h"
#include "olap/row_block2.h"
#include "olap/row_cursor.h"
-#include "olap/iterators.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_reader.h"
@@ -29,14 +29,14 @@ namespace doris {
class BetaRowsetReader : public RowsetReader {
public:
- explicit BetaRowsetReader(BetaRowsetSharedPtr rowset);
+ BetaRowsetReader(BetaRowsetSharedPtr rowset, MemTracker* parent_tracker =
nullptr);
- ~BetaRowsetReader() override {
- _rowset->release();
- }
+ ~BetaRowsetReader() override { _rowset->release(); }
OLAPStatus init(RowsetReaderContext* read_context) override;
+ // If parent_tracker is not null, the block we get from next_block() will
have the parent_tracker.
+ // It's ok, because we only get ref here, the block's owner is this reader.
OLAPStatus next_block(RowBlock** block) override;
bool delete_flag() override { return _rowset->delete_flag(); }
@@ -47,9 +47,7 @@ public:
RowsetSharedPtr rowset() override { return
std::dynamic_pointer_cast<Rowset>(_rowset); }
- int64_t filtered_rows() override {
- return _stats->rows_del_filtered;
- }
+ int64_t filtered_rows() override { return _stats->rows_del_filtered; }
private:
BetaRowsetSharedPtr _rowset;
@@ -58,6 +56,8 @@ private:
OlapReaderStatistics _owned_stats;
OlapReaderStatistics* _stats;
+ MemTracker* _parent_tracker;
+
std::unique_ptr<RowwiseIterator> _iterator;
std::unique_ptr<RowBlockV2> _input_block;
diff --git a/be/src/olap/rowset/column_data.cpp
b/be/src/olap/rowset/column_data.cpp
index 080564d..e330255 100644
--- a/be/src/olap/rowset/column_data.cpp
+++ b/be/src/olap/rowset/column_data.cpp
@@ -24,13 +24,14 @@
namespace doris {
-ColumnData* ColumnData::create(SegmentGroup* segment_group) {
- ColumnData* data = new(std::nothrow) ColumnData(segment_group);
+ColumnData* ColumnData::create(SegmentGroup* segment_group, MemTracker*
parent_tracker) {
+ ColumnData* data = new (std::nothrow) ColumnData(segment_group,
parent_tracker);
return data;
}
-ColumnData::ColumnData(SegmentGroup* segment_group)
+ColumnData::ColumnData(SegmentGroup* segment_group, MemTracker* parent_tracker)
: _segment_group(segment_group),
+ _parent_tracker(parent_tracker),
_eof(false),
_conditions(nullptr),
_col_predicates(nullptr),
@@ -135,7 +136,7 @@ OLAPStatus ColumnData::_seek_to_block(const
RowBlockPosition& block_pos, bool wi
_segment_reader = new(std::nothrow) SegmentReader(
file_name, segment_group(), block_pos.segment,
_seek_columns, _load_bf_columns, _conditions,
- _delete_handler, _delete_status, _lru_cache, _runtime_state,
_stats);
+ _delete_handler, _delete_status, _lru_cache, _runtime_state,
_stats, _parent_tracker);
if (_segment_reader == nullptr) {
OLAP_LOG_WARNING("fail to malloc segment reader.");
return OLAP_ERR_MALLOC_ERROR;
@@ -435,13 +436,15 @@ void ColumnData::set_read_params(
LOG(WARNING) << "fail to init row_cursor";
}
- _read_vector_batch.reset(new VectorizedRowBatch(
- &(_segment_group->get_tablet_schema()), _return_columns,
_num_rows_per_block));
+ _read_vector_batch.reset(new
VectorizedRowBatch(&(_segment_group->get_tablet_schema()),
+ _return_columns,
_num_rows_per_block,
+ _parent_tracker));
- _seek_vector_batch.reset(new VectorizedRowBatch(
- &(_segment_group->get_tablet_schema()), _seek_columns,
_num_rows_per_block));
+ _seek_vector_batch.reset(new
VectorizedRowBatch(&(_segment_group->get_tablet_schema()),
+ _seek_columns,
_num_rows_per_block,
+ _parent_tracker));
- _read_block.reset(new RowBlock(&(_segment_group->get_tablet_schema())));
+ _read_block.reset(new RowBlock(&(_segment_group->get_tablet_schema()),
_parent_tracker));
RowBlockInfo block_info;
block_info.row_num = _num_rows_per_block;
block_info.null_supported = true;
@@ -578,7 +581,7 @@ OLAPStatus ColumnData::schema_change_init() {
_read_vector_batch.reset(new VectorizedRowBatch(
&(_segment_group->get_tablet_schema()), _return_columns,
_num_rows_per_block));
- _read_block.reset(new RowBlock(&(_segment_group->get_tablet_schema())));
+ _read_block.reset(new RowBlock(&(_segment_group->get_tablet_schema()),
_parent_tracker));
RowBlockInfo block_info;
block_info.row_num = _num_rows_per_block;
diff --git a/be/src/olap/rowset/column_data.h b/be/src/olap/rowset/column_data.h
index 06b9a8c..3363a6c 100644
--- a/be/src/olap/rowset/column_data.h
+++ b/be/src/olap/rowset/column_data.h
@@ -40,8 +40,8 @@ class SegmentReader;
// This class is column data reader. this class will be used in two case.
class ColumnData {
public:
- static ColumnData* create(SegmentGroup* segment_group);
- explicit ColumnData(SegmentGroup* segment_group);
+ static ColumnData* create(SegmentGroup* segment_group, MemTracker*
parent_tracker = nullptr);
+ ColumnData(SegmentGroup* segment_group, MemTracker* parent_tracker =
nullptr);
~ColumnData();
// 为了与之前兼容, 暴露部分index的接口
@@ -155,6 +155,7 @@ private:
}
private:
SegmentGroup* _segment_group;
+ MemTracker* _parent_tracker;
// 当到达文件末尾或者到达end key时设置此标志
bool _eof;
const Conditions* _conditions;
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 2c509eb..07a1b9d 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -29,6 +29,7 @@
namespace doris {
class DataDir;
+class MemTracker;
class OlapTuple;
class RowCursor;
class Rowset;
@@ -118,6 +119,10 @@ public:
// returns OLAP_ERR_ROWSET_CREATE_READER when failed to create reader
virtual OLAPStatus create_reader(std::shared_ptr<RowsetReader>* result) =
0;
+ // Support adding parent tracker, but should be careful about destruction
sequence.
+ virtual OLAPStatus create_reader(MemTracker* parent_tracker,
+ std::shared_ptr<RowsetReader>* result) =
0;
+
// Split range denoted by `start_key` and `end_key` into sub-ranges, each
contains roughly
// `request_block_row_count` rows. Sub-range is represented by pair of
OlapTuples and added to `ranges`.
//
diff --git a/be/src/olap/rowset/segment_reader.cpp
b/be/src/olap/rowset/segment_reader.cpp
index b6dddbf..4dcf7cd 100644
--- a/be/src/olap/rowset/segment_reader.cpp
+++ b/be/src/olap/rowset/segment_reader.cpp
@@ -32,42 +32,37 @@ namespace doris {
static const uint32_t MIN_FILTER_BLOCK_NUM = 10;
-SegmentReader::SegmentReader(
- const std::string file,
- SegmentGroup* segment_group,
- uint32_t segment_id,
- const std::vector<uint32_t>& used_columns,
- const std::set<uint32_t>& load_bf_columns,
- const Conditions* conditions,
- const DeleteHandler* delete_handler,
- const DelCondSatisfied delete_status,
- Cache* lru_cache,
- RuntimeState* runtime_state,
- OlapReaderStatistics* stats) :
- _file_name(file),
- _segment_group(segment_group),
- _segment_id(segment_id),
- _used_columns(used_columns),
- _load_bf_columns(load_bf_columns),
- _conditions(conditions),
- _delete_handler(delete_handler),
- _delete_status(delete_status),
- _eof(false),
- _end_block(-1),
- // 确保第一次调用_move_to_next_row,会执行seek_to_block
- _block_count(0),
- _num_rows_in_block(0),
- _null_supported(false),
- _mmap_buffer(NULL),
- _include_blocks(NULL),
- _is_using_mmap(false),
- _is_data_loaded(false),
- _buffer_size(0),
- _shared_buffer(NULL),
- _lru_cache(lru_cache),
- _runtime_state(runtime_state),
- _stats(stats) {
- _tracker.reset(new MemTracker(-1));
+SegmentReader::SegmentReader(const std::string file, SegmentGroup*
segment_group,
+ uint32_t segment_id, const std::vector<uint32_t>&
used_columns,
+ const std::set<uint32_t>& load_bf_columns,
+ const Conditions* conditions, const
DeleteHandler* delete_handler,
+ const DelCondSatisfied delete_status, Cache*
lru_cache,
+ RuntimeState* runtime_state,
OlapReaderStatistics* stats,
+ MemTracker* parent_tracker)
+ : _file_name(file),
+ _segment_group(segment_group),
+ _segment_id(segment_id),
+ _used_columns(used_columns),
+ _load_bf_columns(load_bf_columns),
+ _conditions(conditions),
+ _delete_handler(delete_handler),
+ _delete_status(delete_status),
+ _eof(false),
+ _end_block(-1),
+ // 确保第一次调用_move_to_next_row,会执行seek_to_block
+ _block_count(0),
+ _num_rows_in_block(0),
+ _null_supported(false),
+ _mmap_buffer(NULL),
+ _include_blocks(NULL),
+ _is_using_mmap(false),
+ _is_data_loaded(false),
+ _buffer_size(0),
+ _shared_buffer(NULL),
+ _lru_cache(lru_cache),
+ _runtime_state(runtime_state),
+ _stats(stats) {
+ _tracker.reset(new MemTracker(-1, "SegmentReader", parent_tracker, true));
_mem_pool.reset(new MemPool(_tracker.get()));
}
diff --git a/be/src/olap/rowset/segment_reader.h
b/be/src/olap/rowset/segment_reader.h
index 542fbe7..081ed8b 100644
--- a/be/src/olap/rowset/segment_reader.h
+++ b/be/src/olap/rowset/segment_reader.h
@@ -48,17 +48,12 @@ namespace doris {
// SegmentReader 用于读取一个Segment文件
class SegmentReader {
public:
- explicit SegmentReader(const std::string file,
- SegmentGroup* segment_group,
- uint32_t segment_id,
- const std::vector<uint32_t>& used_columns,
- const std::set<uint32_t>& load_bf_columns,
- const Conditions* conditions,
- const DeleteHandler* delete_handler,
- const DelCondSatisfied delete_status,
- Cache* lru_cache,
- RuntimeState* runtime_state,
- OlapReaderStatistics* stats);
+ SegmentReader(const std::string file, SegmentGroup* segment_group,
uint32_t segment_id,
+ const std::vector<uint32_t>& used_columns,
+ const std::set<uint32_t>& load_bf_columns, const Conditions*
conditions,
+ const DeleteHandler* delete_handler, const DelCondSatisfied
delete_status,
+ Cache* lru_cache, RuntimeState* runtime_state,
OlapReaderStatistics* stats,
+ MemTracker* parent_tracker = nullptr);
~SegmentReader();
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index dffc0de..09ddb09 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -18,6 +18,7 @@
#include "olap/storage_engine.h"
#include <signal.h>
+#include <sys/syscall.h>
#include <algorithm>
#include <cstdio>
@@ -103,18 +104,19 @@ Status StorageEngine::open(const EngineOptions& options,
StorageEngine** engine_
StorageEngine::StorageEngine(const EngineOptions& options)
: _options(options),
- _available_storage_medium_type_count(0),
- _effective_cluster_id(-1),
- _is_all_cluster_id_exist(true),
- _index_stream_lru_cache(NULL),
- _file_cache(nullptr),
- _tablet_manager(new TabletManager(config::tablet_map_shard_size)),
- _txn_manager(new TxnManager(config::txn_map_shard_size,
config::txn_shard_size)),
- _rowset_id_generator(new UniqueRowsetIdGenerator(options.backend_uid)),
- _memtable_flush_executor(nullptr),
- _block_manager(nullptr),
- _default_rowset_type(ALPHA_ROWSET),
- _heartbeat_flags(nullptr) {
+ _available_storage_medium_type_count(0),
+ _effective_cluster_id(-1),
+ _is_all_cluster_id_exist(true),
+ _index_stream_lru_cache(NULL),
+ _file_cache(nullptr),
+ _compaction_mem_tracker(-1, "compaction mem tracker(unlimited)"),
+ _tablet_manager(new TabletManager(config::tablet_map_shard_size)),
+ _txn_manager(new TxnManager(config::txn_map_shard_size,
config::txn_shard_size)),
+ _rowset_id_generator(new
UniqueRowsetIdGenerator(options.backend_uid)),
+ _memtable_flush_executor(nullptr),
+ _block_manager(nullptr),
+ _default_rowset_type(ALPHA_ROWSET),
+ _heartbeat_flags(nullptr) {
if (_s_instance == nullptr) {
_s_instance = this;
}
@@ -122,6 +124,11 @@ StorageEngine::StorageEngine(const EngineOptions& options)
MutexLock lock(&_gc_mutex);
return _unused_rowsets.size();
});
+ REGISTER_GAUGE_DORIS_METRIC(compaction_mem_current_consumption, [this]() {
+ return _compaction_mem_tracker.consumption();
+ // We can get each compaction's detail usage
+ LOG(INFO) << _compaction_mem_tracker.LogUsage(2);
+ });
}
StorageEngine::~StorageEngine() {
@@ -530,7 +537,9 @@ void StorageEngine::_perform_cumulative_compaction(DataDir*
data_dir) {
TRACE("found best tablet $0", best_tablet->get_tablet_info().tablet_id);
DorisMetrics::instance()->cumulative_compaction_request_total.increment(1);
- CumulativeCompaction cumulative_compaction(best_tablet);
+
+ std::string tracker_label = "cumulative compaction " +
std::to_string(syscall(__NR_gettid));
+ CumulativeCompaction cumulative_compaction(best_tablet, tracker_label,
&_compaction_mem_tracker);
OLAPStatus res = cumulative_compaction.compact();
if (res != OLAP_SUCCESS) {
@@ -564,7 +573,9 @@ void StorageEngine::_perform_base_compaction(DataDir*
data_dir) {
TRACE("found best tablet $0", best_tablet->get_tablet_info().tablet_id);
DorisMetrics::instance()->base_compaction_request_total.increment(1);
- BaseCompaction base_compaction(best_tablet);
+
+ std::string tracker_label = "base compaction " +
std::to_string(syscall(__NR_gettid));
+ BaseCompaction base_compaction(best_tablet, tracker_label,
&_compaction_mem_tracker);
OLAPStatus res = base_compaction.compact();
if (res != OLAP_SUCCESS) {
best_tablet->set_last_base_compaction_failure_time(UnixMillis());
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 2bc0c75..d8d97dd 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -316,6 +316,8 @@ private:
// map<rowset_id(str), RowsetSharedPtr>, if we use RowsetId as the key, we
need custom hash func
std::unordered_map<std::string, RowsetSharedPtr> _unused_rowsets;
+ MemTracker _compaction_mem_tracker;
+
bool _stop_bg_worker = false;
std::thread _unused_rowset_monitor_thread;
// thread to monitor snapshot expiry
diff --git a/be/src/runtime/initial_reservations.cc
b/be/src/runtime/initial_reservations.cc
index 9c2bd7f..1a279f4 100644
--- a/be/src/runtime/initial_reservations.cc
+++ b/be/src/runtime/initial_reservations.cc
@@ -40,7 +40,7 @@ InitialReservations::InitialReservations(ObjectPool* obj_pool,
ReservationTracker* query_reservation, MemTracker* query_mem_tracker,
int64_t initial_reservation_total_claims)
: initial_reservation_mem_tracker_(obj_pool->add(
- new MemTracker(-1, "Unclaimed reservations", query_mem_tracker, false))),
+ new MemTracker(-1, "Unclaimed reservations", query_mem_tracker, false,
false))),
remaining_initial_reservation_claims_(initial_reservation_total_claims) {
initial_reservations_.InitChildTracker(nullptr, query_reservation,
initial_reservation_mem_tracker_, numeric_limits<int64_t>::max());
diff --git a/be/src/runtime/vectorized_row_batch.cpp
b/be/src/runtime/vectorized_row_batch.cpp
index ae371e7..60b309c 100644
--- a/be/src/runtime/vectorized_row_batch.cpp
+++ b/be/src/runtime/vectorized_row_batch.cpp
@@ -22,15 +22,14 @@
namespace doris {
-VectorizedRowBatch::VectorizedRowBatch(
- const TabletSchema* schema,
- const std::vector<uint32_t>& cols,
- int capacity)
- : _schema(schema), _cols(cols), _capacity(capacity),
_limit(capacity) {
+VectorizedRowBatch::VectorizedRowBatch(const TabletSchema* schema,
+ const std::vector<uint32_t>& cols, int
capacity,
+ MemTracker* parent_tracker)
+ : _schema(schema), _cols(cols), _capacity(capacity), _limit(capacity) {
_selected_in_use = false;
_size = 0;
- _tracker.reset(new MemTracker(-1));
+ _tracker.reset(new MemTracker(-1, "VectorizedRowBatch", parent_tracker,
true));
_mem_pool.reset(new MemPool(_tracker.get()));
_selected = reinterpret_cast<uint16_t*>(new char[sizeof(uint16_t) *
_capacity]);
diff --git a/be/src/runtime/vectorized_row_batch.h
b/be/src/runtime/vectorized_row_batch.h
index d4959a6..aef23ae 100644
--- a/be/src/runtime/vectorized_row_batch.h
+++ b/be/src/runtime/vectorized_row_batch.h
@@ -72,10 +72,8 @@ private:
class VectorizedRowBatch {
public:
- VectorizedRowBatch(
- const TabletSchema* schema,
- const std::vector<uint32_t>& cols,
- int capacity);
+ VectorizedRowBatch(const TabletSchema* schema, const
std::vector<uint32_t>& cols, int capacity,
+ MemTracker* parent_tracker = nullptr);
~VectorizedRowBatch() {
for (auto vec: _col_vectors) {
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 1dc7fc9..229b764 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -20,8 +20,8 @@
#include <set>
#include <string>
-#include <vector>
#include <unordered_map>
+#include <vector>
#include "util/metrics.h"
#include "util/system_metrics.h"
@@ -46,11 +46,10 @@ private:
std::unordered_map<std::string, std::unique_ptr<IntGauge>> metrics;
};
-#define REGISTER_GAUGE_DORIS_METRIC(name, func) \
- DorisMetrics::instance()->metrics()->register_metric(#name,
&DorisMetrics::instance()->name); \
- DorisMetrics::instance()->metrics()->register_hook(#name, [&]() { \
- DorisMetrics::instance()->name.set_value(func()); \
-});
+#define REGISTER_GAUGE_DORIS_METRIC(name, func)
\
+ DorisMetrics::instance()->metrics()->register_metric(#name,
&DorisMetrics::instance()->name); \
+ DorisMetrics::instance()->metrics()->register_hook(
\
+ #name, [&]() { DorisMetrics::instance()->name.set_value(func());
});
class DorisMetrics {
public:
@@ -185,6 +184,7 @@ public:
METRIC_DEFINE_UINT_GAUGE(stream_load_pipe_count, MetricUnit::NOUNIT);
METRIC_DEFINE_UINT_GAUGE(brpc_endpoint_stub_count, MetricUnit::NOUNIT);
METRIC_DEFINE_UINT_GAUGE(tablet_writer_count, MetricUnit::NOUNIT);
+ METRIC_DEFINE_UINT_GAUGE(compaction_mem_current_consumption,
MetricUnit::BYTES);
static DorisMetrics* instance() {
static DorisMetrics instance;
@@ -193,10 +193,10 @@ public:
// not thread-safe, call before calling metrics
void initialize(
- const std::vector<std::string>& paths = std::vector<std::string>(),
- bool init_system_metrics = false,
- const std::set<std::string>& disk_devices = std::set<std::string>(),
- const std::vector<std::string>& network_interfaces =
std::vector<std::string>());
+ const std::vector<std::string>& paths = std::vector<std::string>(),
+ bool init_system_metrics = false,
+ const std::set<std::string>& disk_devices =
std::set<std::string>(),
+ const std::vector<std::string>& network_interfaces =
std::vector<std::string>());
MetricRegistry* metrics() { return &_metrics; }
SystemMetrics* system_metrics() { return &_system_metrics; }
@@ -217,6 +217,6 @@ private:
SystemMetrics _system_metrics;
};
-};
+}; // namespace doris
#endif
diff --git
a/docs/en/administrator-guide/operation/monitor-metrics/be-metrics.md
b/docs/en/administrator-guide/operation/monitor-metrics/be-metrics.md
index a2716a7..4fd45d4 100644
--- a/docs/en/administrator-guide/operation/monitor-metrics/be-metrics.md
+++ b/docs/en/administrator-guide/operation/monitor-metrics/be-metrics.md
@@ -75,3 +75,10 @@ Value of the `Tcp: OutSegs` field in `/proc/net/snmp`.
Represents the number of
Use `(NEW_tcp_retrans_segs - OLD_tcp_retrans_segs) / (NEW_tcp_out_segs -
OLD_tcp_out_segs)` can calculate the retrans rate of TCP packets.
Usually used to troubleshoot network problems.
+
+### `doris_be_compaction_mem_current_consumption`
+
+The total MemPool consumption of all running `Compaction` threads. Use this
value, we can easily identify whether
+Compactions use too much memory, it may cause memory overhead or OOM.
+
+Usually used to troubleshoot memory problems.
\ No newline at end of file
diff --git
a/docs/zh-CN/administrator-guide/operation/monitor-metrics/be-metrics.md
b/docs/zh-CN/administrator-guide/operation/monitor-metrics/be-metrics.md
index 41533b8..db555ef 100644
--- a/docs/zh-CN/administrator-guide/operation/monitor-metrics/be-metrics.md
+++ b/docs/zh-CN/administrator-guide/operation/monitor-metrics/be-metrics.md
@@ -75,3 +75,10 @@ BE 的监控项可以通过以下方式访问:
通过 `(NEW_tcp_tcp_retrans_segs - OLD_tcp_retrans_segs) / (NEW_tcp_out_segs -
OLD_tcp_out_segs)` 可以计算 TCP 重传率。
通常用于排查网络问题。
+
+### `doris_be_compaction_mem_current_consumption`
+
+该监控项为Compaction使用的MemPool总和(所有Compaction线程)。通过该值,可以迅速判断Compaction是否占用过多内存,引起高内存占用
+甚至OOM等问题。
+
+通常用于排查内存使用问题。
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]