This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2b965303d8f [branch-3.0] pick some bugfix (#42128) 2b965303d8f is described below commit 2b965303d8f92ffa66609264c300da1c71879242 Author: yiguolei <676222...@qq.com> AuthorDate: Sat Oct 19 18:19:28 2024 +0800 [branch-3.0] pick some bugfix (#42128) --- be/src/common/config.cpp | 2 - be/src/common/config.h | 2 - be/src/common/daemon.cpp | 1 + be/src/olap/memtable.cpp | 40 +++++------ be/src/olap/memtable.h | 37 +++++----- be/src/olap/memtable_flush_executor.cpp | 24 +++---- be/src/olap/memtable_flush_executor.h | 4 +- be/src/olap/memtable_memory_limiter.cpp | 4 +- be/src/olap/memtable_writer.cpp | 40 ++++------- be/src/olap/memtable_writer.h | 11 ++- be/src/olap/rowset/segment_v2/segment.cpp | 57 ++++++++------- be/src/olap/rowset/segment_v2/segment.h | 10 ++- be/src/olap/rowset/segment_v2/segment_iterator.cpp | 6 +- be/src/olap/segment_loader.cpp | 8 ++- be/src/olap/segment_loader.h | 12 ++++ be/src/runtime/load_channel.cpp | 7 -- be/src/runtime/load_channel.h | 1 - be/src/runtime/memory/mem_tracker_limiter.h | 3 - be/src/runtime/query_context.cpp | 2 - be/src/runtime/tablets_channel.cpp | 2 +- be/src/runtime/workload_group/workload_group.cpp | 84 +++++++++++++--------- be/src/runtime/workload_group/workload_group.h | 15 +--- .../workload_group/workload_group_manager.cpp | 7 ++ .../workload_group/workload_group_manager.h | 2 + be/src/service/internal_service.cpp | 16 ++--- 25 files changed, 199 insertions(+), 198 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 6e459b3638b..4aea6200ce0 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -626,8 +626,6 @@ DEFINE_mInt32(memtable_hard_limit_active_percent, "50"); // percent of (active memtables size / all memtables size) when reach soft limit DEFINE_mInt32(memtable_soft_limit_active_percent, "50"); -// memtable insert memory tracker will multiply input block size with this ratio -DEFINE_mDouble(memtable_insert_memory_ratio, "1.4"); // max write buffer size before flush, default 200MB DEFINE_mInt64(write_buffer_size, "209715200"); // max buffer size used in memtable for the aggregated table, default 400MB diff --git a/be/src/common/config.h b/be/src/common/config.h index dc1f60b69f8..13437d96b8e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -674,8 +674,6 @@ DECLARE_mInt32(memtable_hard_limit_active_percent); // percent of (active memtables size / all memtables size) when reach soft limit DECLARE_mInt32(memtable_soft_limit_active_percent); -// memtable insert memory tracker will multiply input block size with this ratio -DECLARE_mDouble(memtable_insert_memory_ratio); // max write buffer size before flush, default 200MB DECLARE_mInt64(write_buffer_size); // max buffer size used in memtable for the aggregated table, default 400MB diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 5da49758865..27fbfb71d7f 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -296,6 +296,7 @@ void Daemon::memory_maintenance_thread() { // TODO replace memory_gc_thread. // step 6. Refresh weighted memory ratio of workload groups. + doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep(); doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit(); // step 7. Analyze blocking queries. diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 0040e00ffc9..a70486e39b3 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -50,20 +50,16 @@ using namespace ErrorCode; MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema, const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc, - bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info, - const std::shared_ptr<MemTracker>& insert_mem_tracker, - const std::shared_ptr<MemTracker>& flush_mem_tracker) - : _tablet_id(tablet_id), + bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info) + : _mem_type(MemType::ACTIVE), + _tablet_id(tablet_id), _enable_unique_key_mow(enable_unique_key_mow), _keys_type(tablet_schema->keys_type()), _tablet_schema(tablet_schema), - _insert_mem_tracker(insert_mem_tracker), - _flush_mem_tracker(flush_mem_tracker), _is_first_insertion(true), _agg_functions(tablet_schema->num_columns()), _offsets_of_aggregate_states(tablet_schema->num_columns()), - _total_size_of_aggregate_states(0), - _mem_usage(0) { + _total_size_of_aggregate_states(0) { g_memtable_cnt << 1; _query_thread_context.init_unlocked(); _arena = std::make_unique<vectorized::Arena>(); @@ -82,6 +78,7 @@ MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schem } // TODO: Support ZOrderComparator in the future _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); + _mem_tracker = std::make_shared<MemTracker>(); } void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs, @@ -142,6 +139,13 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) { MemTable::~MemTable() { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker); + if (_is_flush_success) { + // If the memtable is flush success, then its memtracker's consumption should be 0 + if (_mem_tracker->consumption() != 0 && config::crash_in_memory_tracker_inaccurate) { + LOG(FATAL) << "memtable flush success but cosumption is not 0, it is " + << _mem_tracker->consumption(); + } + } g_memtable_input_block_allocated_size << -_input_mutable_block.allocated_bytes(); g_memtable_cnt << -1; if (_keys_type != KeysType::DUP_KEYS) { @@ -159,13 +163,7 @@ MemTable::~MemTable() { } } std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete<RowInBlock>()); - _insert_mem_tracker->release(_mem_usage); - _flush_mem_tracker->set_consumption(0); - DCHECK_EQ(_insert_mem_tracker->consumption(), 0) << std::endl - << _insert_mem_tracker->log_usage(); - DCHECK_EQ(_flush_mem_tracker->consumption(), 0); _arena.reset(); - _agg_buffer_pool.clear(); _vec_row_comparator.reset(); _row_in_blocks.clear(); _agg_functions.clear(); @@ -180,6 +178,7 @@ int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* r Status MemTable::insert(const vectorized::Block* input_block, const std::vector<uint32_t>& row_idxs) { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); if (_is_first_insertion) { _is_first_insertion = false; auto clone_block = input_block->clone_without_columns(&_column_offset); @@ -214,10 +213,6 @@ Status MemTable::insert(const vectorized::Block* input_block, row_idxs.data() + num_rows, &_column_offset)); auto block_size1 = _input_mutable_block.allocated_bytes(); g_memtable_input_block_allocated_size << block_size1 - block_size0; - auto input_size = size_t(input_block->bytes() * num_rows / input_block->rows() * - config::memtable_insert_memory_ratio); - _mem_usage += input_size; - _insert_mem_tracker->consume(input_size); for (int i = 0; i < num_rows; i++) { _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i}); } @@ -467,10 +462,6 @@ void MemTable::_aggregate() { } if constexpr (!is_final) { // if is not final, we collect the agg results to input_block and then continue to insert - size_t shrunked_after_agg = _output_mutable_block.allocated_bytes(); - // flush will not run here, so will not duplicate `_flush_mem_tracker` - _insert_mem_tracker->consume(shrunked_after_agg - _mem_usage); - _mem_usage = shrunked_after_agg; _input_mutable_block.swap(_output_mutable_block); //TODO(weixang):opt here. std::unique_ptr<vectorized::Block> empty_input_block = in_block.create_same_struct_block(0); @@ -483,6 +474,7 @@ void MemTable::_aggregate() { } void MemTable::shrink_memtable_by_agg() { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); if (_keys_type == KeysType::DUP_KEYS) { return; } @@ -532,8 +524,8 @@ Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) { } g_memtable_input_block_allocated_size << -_input_mutable_block.allocated_bytes(); _input_mutable_block.clear(); - _insert_mem_tracker->release(_mem_usage); - _mem_usage = 0; + // After to block, all data in arena is saved in the block + _arena.reset(); *res = vectorized::Block::create_unique(_output_mutable_block.to_block()); return Status::OK(); } diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 70f7a9f22a0..4ae92c2d2d8 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -47,6 +47,11 @@ class TabletSchema; class TupleDescriptor; enum KeysType : int; +// Active: the memtable is currently used by writer to insert into blocks +// Write_finished: the memtable finished write blocks and in the queue waiting for flush +// FLUSH: the memtable is under flushing, write segment to disk. +enum MemType { ACTIVE = 0, WRITE_FINISHED = 1, FLUSH = 2 }; + // row pos in _input_mutable_block struct RowInBlock { size_t _row_pos; @@ -171,16 +176,11 @@ class MemTable { public: MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema, const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc, - bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info, - const std::shared_ptr<MemTracker>& insert_mem_tracker, - const std::shared_ptr<MemTracker>& flush_mem_tracker); + bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info); ~MemTable(); int64_t tablet_id() const { return _tablet_id; } - size_t memory_usage() const { - return _insert_mem_tracker->consumption() + _arena->used_size() + - _flush_mem_tracker->consumption(); - } + size_t memory_usage() const { return _mem_tracker->consumption(); } // insert tuple from (row_pos) to (row_pos+num_rows) Status insert(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs); @@ -196,10 +196,16 @@ public: const MemTableStat& stat() { return _stat; } - std::shared_ptr<MemTracker> flush_mem_tracker() { return _flush_mem_tracker; } - QueryThreadContext query_thread_context() { return _query_thread_context; } + std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; } + + void set_flush_success() { _is_flush_success = true; } + + MemType get_mem_type() { return _mem_type; } + + void update_mem_type(MemType memtype) { _mem_type = memtype; } + private: // for vectorized void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row, @@ -209,9 +215,11 @@ private: Status _to_block(std::unique_ptr<vectorized::Block>* res); private: + std::atomic<MemType> _mem_type; int64_t _tablet_id; bool _enable_unique_key_mow = false; bool _is_partial_update = false; + bool _is_flush_success = false; const KeysType _keys_type; std::shared_ptr<TabletSchema> _tablet_schema; @@ -219,18 +227,11 @@ private: QueryThreadContext _query_thread_context; - // `_insert_manual_mem_tracker` manually records the memory value of memtable insert() - // `_flush_hook_mem_tracker` automatically records the memory value of memtable flush() through mem hook. - // Is used to flush when _insert_manual_mem_tracker larger than write_buffer_size and run flush memtable - // when the sum of all memtable (_insert_manual_mem_tracker + _flush_hook_mem_tracker) exceeds the limit. - std::shared_ptr<MemTracker> _insert_mem_tracker; - std::shared_ptr<MemTracker> _flush_mem_tracker; + std::shared_ptr<MemTracker> _mem_tracker; // Only the rows will be inserted into block can allocate memory from _arena. // In this way, we can make MemTable::memory_usage() to be more accurate, and eventually // reduce the number of segment files that are generated by current load std::unique_ptr<vectorized::Arena> _arena; - // The object buffer pool for convert tuple to row - ObjectPool _agg_buffer_pool; void _init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs, const TupleDescriptor* tuple_desc); @@ -264,8 +265,6 @@ private: std::vector<size_t> _offsets_of_aggregate_states; size_t _total_size_of_aggregate_states; std::vector<RowInBlock*> _row_in_blocks; - // Memory usage without _arena. - size_t _mem_usage; size_t _num_columns; int32_t _seq_col_idx_in_block = -1; diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index 887340eed70..dc911647be8 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -46,10 +46,10 @@ class MemtableFlushTask final : public Runnable { ENABLE_FACTORY_CREATOR(MemtableFlushTask); public: - MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, std::unique_ptr<MemTable> memtable, + MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, std::shared_ptr<MemTable> memtable, int32_t segment_id, int64_t submit_task_time) : _flush_token(flush_token), - _memtable(std::move(memtable)), + _memtable(memtable), _segment_id(segment_id), _submit_task_time(submit_task_time) { g_flush_task_num << 1; @@ -60,7 +60,7 @@ public: void run() override { auto token = _flush_token.lock(); if (token) { - token->_flush_memtable(std::move(_memtable), _segment_id, _submit_task_time); + token->_flush_memtable(_memtable, _segment_id, _submit_task_time); } else { LOG(WARNING) << "flush token is deconstructed, ignore the flush task"; } @@ -68,7 +68,7 @@ public: private: std::weak_ptr<FlushToken> _flush_token; - std::unique_ptr<MemTable> _memtable; + std::shared_ptr<MemTable> _memtable; int32_t _segment_id; int64_t _submit_task_time; }; @@ -83,7 +83,7 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { return os; } -Status FlushToken::submit(std::unique_ptr<MemTable> mem_table) { +Status FlushToken::submit(std::shared_ptr<MemTable> mem_table) { { std::shared_lock rdlk(_flush_status_lock); DBUG_EXECUTE_IF("FlushToken.submit_flush_error", { @@ -98,9 +98,8 @@ Status FlushToken::submit(std::unique_ptr<MemTable> mem_table) { return Status::OK(); } int64_t submit_task_time = MonotonicNanos(); - auto task = MemtableFlushTask::create_shared(shared_from_this(), std::move(mem_table), - _rowset_writer->allocate_segment_id(), - submit_task_time); + auto task = MemtableFlushTask::create_shared( + shared_from_this(), mem_table, _rowset_writer->allocate_segment_id(), submit_task_time); Status ret = _thread_pool->submit(std::move(task)); if (ret.ok()) { // _wait_running_task_finish was executed after this function, so no need to notify _cond here @@ -136,20 +135,19 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in VLOG_CRITICAL << "begin to flush memtable for tablet: " << memtable->tablet_id() << ", memsize: " << memtable->memory_usage() << ", rows: " << memtable->stat().raw_rows; + memtable->update_mem_type(MemType::FLUSH); int64_t duration_ns; SCOPED_RAW_TIMER(&duration_ns); SCOPED_ATTACH_TASK(memtable->query_thread_context()); signal::set_signal_task_id(_rowset_writer->load_id()); signal::tablet_id = memtable->tablet_id(); { + SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker()); std::unique_ptr<vectorized::Block> block; - // During to block method, it will release old memory and create new block, so that - // we could not scoped it. RETURN_IF_ERROR(memtable->to_block(&block)); - memtable->flush_mem_tracker()->consume(block->allocated_bytes()); - SCOPED_CONSUME_MEM_TRACKER(memtable->flush_mem_tracker()); RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), segment_id, flush_size)); } + memtable->set_flush_success(); _memtable_stat += memtable->stat(); DorisMetrics::instance()->memtable_flush_total->increment(1); DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000); @@ -158,7 +156,7 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in return Status::OK(); } -void FlushToken::_flush_memtable(std::unique_ptr<MemTable> memtable_ptr, int32_t segment_id, +void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id, int64_t submit_task_time) { Defer defer {[&]() { std::lock_guard<std::mutex> lock(_mutex); diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 2d20298f800..25c5a37afba 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -61,7 +61,7 @@ class FlushToken : public std::enable_shared_from_this<FlushToken> { public: FlushToken(ThreadPool* thread_pool) : _flush_status(Status::OK()), _thread_pool(thread_pool) {} - Status submit(std::unique_ptr<MemTable> mem_table); + Status submit(std::shared_ptr<MemTable> mem_table); // error has happens, so we cancel this token // And remove all tasks in the queue. @@ -87,7 +87,7 @@ private: private: friend class MemtableFlushTask; - void _flush_memtable(std::unique_ptr<MemTable> memtable_ptr, int32_t segment_id, + void _flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id, int64_t submit_task_time); Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size); diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index ea045b1e53e..9b9ce19f895 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -20,6 +20,7 @@ #include <bvar/bvar.h> #include "common/config.h" +#include "olap/memtable.h" #include "olap/memtable_writer.h" #include "util/doris_metrics.h" #include "util/mem_info.h" @@ -237,13 +238,14 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() { _active_writers.clear(); for (auto it = _writers.begin(); it != _writers.end();) { if (auto writer = it->lock()) { + // The memtable is currently used by writer to insert blocks. auto active_usage = writer->active_memtable_mem_consumption(); _active_mem_usage += active_usage; if (active_usage > 0) { _active_writers.push_back(writer); } _flush_mem_usage += writer->mem_consumption(MemType::FLUSH); - _write_mem_usage += writer->mem_consumption(MemType::WRITE); + _write_mem_usage += writer->mem_consumption(MemType::WRITE_FINISHED); ++it; } else { *it = std::move(_writers.back()); diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index 59916d5f1cc..e8123c48ecc 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -133,12 +133,18 @@ Status MemTableWriter::write(const vectorized::Block* block, Status MemTableWriter::_flush_memtable_async() { DCHECK(_flush_token != nullptr); - std::unique_ptr<MemTable> memtable; + std::shared_ptr<MemTable> memtable; { std::lock_guard<SpinLock> l(_mem_table_ptr_lock); - memtable = std::move(_mem_table); + memtable = _mem_table; + _mem_table = nullptr; } - return _flush_token->submit(std::move(memtable)); + { + std::lock_guard<SpinLock> l(_mem_table_ptr_lock); + memtable->update_mem_type(MemType::WRITE_FINISHED); + _freezed_mem_tables.push_back(memtable); + } + return _flush_token->submit(memtable); } Status MemTableWriter::flush_async() { @@ -187,22 +193,10 @@ Status MemTableWriter::wait_flush() { } void MemTableWriter::_reset_mem_table() { - auto mem_table_insert_tracker = std::make_shared<MemTracker>(fmt::format( - "MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", - std::to_string(tablet_id()), _mem_table_num, UniqueId(_req.load_id).to_string())); - auto mem_table_flush_tracker = std::make_shared<MemTracker>(fmt::format( - "MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", std::to_string(tablet_id()), - _mem_table_num++, UniqueId(_req.load_id).to_string())); - { - std::lock_guard<SpinLock> l(_mem_table_tracker_lock); - _mem_table_insert_trackers.push_back(mem_table_insert_tracker); - _mem_table_flush_trackers.push_back(mem_table_flush_tracker); - } { std::lock_guard<SpinLock> l(_mem_table_ptr_lock); _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema, _req.slots, _req.tuple_desc, - _unique_key_mow, _partial_update_info.get(), - mem_table_insert_tracker, mem_table_flush_tracker)); + _unique_key_mow, _partial_update_info.get())); } _segment_num++; @@ -353,15 +347,11 @@ int64_t MemTableWriter::mem_consumption(MemType mem) { } int64_t mem_usage = 0; { - std::lock_guard<SpinLock> l(_mem_table_tracker_lock); - if ((mem & MemType::WRITE) == MemType::WRITE) { // 3 & 2 = 2 - for (const auto& mem_table_tracker : _mem_table_insert_trackers) { - mem_usage += mem_table_tracker->consumption(); - } - } - if ((mem & MemType::FLUSH) == MemType::FLUSH) { // 3 & 1 = 1 - for (const auto& mem_table_tracker : _mem_table_flush_trackers) { - mem_usage += mem_table_tracker->consumption(); + std::lock_guard<SpinLock> l(_mem_table_ptr_lock); + for (const auto& mem_table : _freezed_mem_tables) { + auto mem_table_sptr = mem_table.lock(); + if (mem_table_sptr != nullptr && mem_table_sptr->get_mem_type() == mem) { + mem_usage += mem_table_sptr->memory_usage(); } } } diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h index ee7c8e1538a..ec44348b4a9 100644 --- a/be/src/olap/memtable_writer.h +++ b/be/src/olap/memtable_writer.h @@ -57,8 +57,6 @@ namespace vectorized { class Block; } // namespace vectorized -enum MemType { WRITE = 1, FLUSH = 2, ALL = 3 }; - // Writer for a particular (load, index, tablet). // This class is NOT thread-safe, external synchronization is required. class MemTableWriter { @@ -123,18 +121,17 @@ private: Status _cancel_status; WriteRequest _req; std::shared_ptr<RowsetWriter> _rowset_writer; - std::unique_ptr<MemTable> _mem_table; + std::shared_ptr<MemTable> _mem_table; TabletSchemaSPtr _tablet_schema; bool _unique_key_mow = false; // This variable is accessed from writer thread and token flush thread // use a shared ptr to avoid use after free problem. std::shared_ptr<FlushToken> _flush_token; - std::vector<std::shared_ptr<MemTracker>> _mem_table_insert_trackers; - std::vector<std::shared_ptr<MemTracker>> _mem_table_flush_trackers; - SpinLock _mem_table_tracker_lock; + // Save the not active memtable that is in flush queue or under flushing. + std::vector<std::weak_ptr<MemTable>> _freezed_mem_tables; + // The lock to protect _memtable and _freezed_mem_tables structure to avoid concurrency modification or read SpinLock _mem_table_ptr_lock; - std::atomic<uint32_t> _mem_table_num = 1; QueryThreadContext _query_thread_context; std::mutex _lock; diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index f0710a5e2ba..11457a7a332 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -450,25 +450,12 @@ Status Segment::_load_pk_bloom_filter() { DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS); DCHECK(_pk_index_meta != nullptr); DCHECK(_pk_index_reader != nullptr); - auto status = [this]() { - return _load_pk_bf_once.call([this] { - RETURN_IF_ERROR(_pk_index_reader->parse_bf(_file_reader, *_pk_index_meta)); - // _meta_mem_usage += _pk_index_reader->get_bf_memory_size(); - return Status::OK(); - }); - }(); - if (!status.ok()) { - remove_from_segment_cache(); - } - return status; -} -void Segment::remove_from_segment_cache() const { - if (config::disable_segment_cache) { - return; - } - SegmentCache::CacheKey cache_key(_rowset_id, _segment_id); - SegmentLoader::instance()->erase_segment(cache_key); + return _load_pk_bf_once.call([this] { + RETURN_IF_ERROR(_pk_index_reader->parse_bf(_file_reader, *_pk_index_meta)); + // _meta_mem_usage += _pk_index_reader->get_bf_memory_size(); + return Status::OK(); + }); } Status Segment::load_pk_index_and_bf() { @@ -478,14 +465,6 @@ Status Segment::load_pk_index_and_bf() { } Status Segment::load_index() { - auto status = [this]() { return _load_index_impl(); }(); - if (!status.ok()) { - remove_from_segment_cache(); - } - return status; -} - -Status Segment::_load_index_impl() { return _load_index_once.call([this] { if (_tablet_schema->keys_type() == UNIQUE_KEYS && _pk_index_meta != nullptr) { _pk_index_reader = std::make_unique<PrimaryKeyIndexReader>(); @@ -519,6 +498,32 @@ Status Segment::_load_index_impl() { }); } +Status Segment::healthy_status() { + try { + if (_load_index_once.has_called()) { + RETURN_IF_ERROR(_load_index_once.stored_result()); + } + if (_load_pk_bf_once.has_called()) { + RETURN_IF_ERROR(_load_pk_bf_once.stored_result()); + } + if (_create_column_readers_once_call.has_called()) { + RETURN_IF_ERROR(_create_column_readers_once_call.stored_result()); + } + if (_inverted_index_file_reader_open.has_called()) { + RETURN_IF_ERROR(_inverted_index_file_reader_open.stored_result()); + } + // This status is set by running time, for example, if there is something wrong during read segment iterator. + return _healthy_status.status(); + } catch (const doris::Exception& e) { + // If there is an exception during load_xxx, should not throw exception directly because + // the caller may not exception safe. + return e.to_status(); + } catch (const std::exception& e) { + // The exception is not thrown by doris code. + return Status::InternalError("Unexcepted error during load segment: {}", e.what()); + } +} + // Return the storage datatype of related column to field. // Return nullptr meaning no such storage infomation for this column vectorized::DataTypePtr Segment::get_data_type_of(const ColumnIdentifier& identifier, diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index a4f01873f4c..035860b9bc9 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -142,6 +142,12 @@ public: Status load_pk_index_and_bf(); + void update_healthy_status(Status new_status) { _healthy_status.update(new_status); } + // The segment is loaded into SegmentCache and then will load indices, if there are something wrong + // during loading indices, should remove it from SegmentCache. If not, it will always report error during + // query. So we add a healthy status API, the caller should check the healhty status before using the segment. + Status healthy_status(); + std::string min_key() { DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _pk_index_meta != nullptr); return _pk_index_meta->min_key(); @@ -155,8 +161,6 @@ public: int64_t meta_mem_usage() const { return _meta_mem_usage; } - void remove_from_segment_cache() const; - // Identify the column by unique id or path info struct ColumnIdentifier { int32_t unique_id = -1; @@ -222,7 +226,6 @@ private: Status _write_error_file(size_t file_size, size_t offset, size_t bytes_read, char* data, io::IOContext& io_ctx); - Status _load_index_impl(); Status _open_inverted_index(); Status _create_column_readers_once(); @@ -233,6 +236,7 @@ private: io::FileReaderSPtr _file_reader; uint32_t _segment_id; uint32_t _num_rows; + AtomicStatus _healthy_status; // 1. Tracking memory use by segment meta data such as footer or index page. // 2. Tracking memory use by segment column reader diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 3d8e06bbc00..04ec5830d28 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -270,8 +270,8 @@ SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr sc Status SegmentIterator::init(const StorageReadOptions& opts) { auto status = _init_impl(opts); - if (!status.ok() && !config::disable_segment_cache) { - _segment->remove_from_segment_cache(); + if (!status.ok()) { + _segment->update_healthy_status(status); } return status; } @@ -1925,7 +1925,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { // if rows read by batch is 0, will return end of file, we should not remove segment cache in this situation. if (!status.ok() && !status.is<END_OF_FILE>()) { - _segment->remove_from_segment_cache(); + _segment->update_healthy_status(status); } return status; } diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp index fd7e3f476ad..abc82c6f3ee 100644 --- a/be/src/olap/segment_loader.cpp +++ b/be/src/olap/segment_loader.cpp @@ -59,8 +59,14 @@ Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset, for (int64_t i = 0; i < rowset->num_segments(); i++) { SegmentCache::CacheKey cache_key(rowset->rowset_id(), i); if (_segment_cache->lookup(cache_key, cache_handle)) { - continue; + // Has to check the segment status here, because the segment in cache may has something wrong during + // load index or create column reader. + // Not merge this if logic with previous to make the logic more clear. + if (cache_handle->pop_unhealthy_segment() == nullptr) { + continue; + } } + // If the segment is not healthy, then will create a new segment and will replace the unhealthy one in SegmentCache. segment_v2::SegmentSharedPtr segment; RETURN_IF_ERROR(rowset->load_segment(i, &segment)); if (need_load_pk_index_and_bf) { diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h index d177024242d..b3b88fa7700 100644 --- a/be/src/olap/segment_loader.h +++ b/be/src/olap/segment_loader.h @@ -161,6 +161,18 @@ public: _init = true; } + segment_v2::SegmentSharedPtr pop_unhealthy_segment() { + if (segments.empty()) { + return nullptr; + } + segment_v2::SegmentSharedPtr last_segment = segments.back(); + if (last_segment->healthy_status().ok()) { + return nullptr; + } + segments.pop_back(); + return last_segment; + } + private: std::vector<segment_v2::SegmentSharedPtr> segments; bool _init {false}; diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index f8c11639719..1ac7753b197 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -64,7 +64,6 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig if (workload_group_ptr) { wg_ptr = workload_group_ptr; wg_ptr->add_mem_tracker_limiter(mem_tracker); - _need_release_memtracker = true; } } } @@ -85,12 +84,6 @@ LoadChannel::~LoadChannel() { rows_str << ", index id: " << entry.first << ", total_received_rows: " << entry.second.first << ", num_rows_filtered: " << entry.second.second; } - if (_need_release_memtracker) { - WorkloadGroupPtr wg_ptr = _query_thread_context.get_workload_group_ptr(); - if (wg_ptr) { - wg_ptr->remove_mem_tracker_limiter(_query_thread_context.get_memory_tracker()); - } - } LOG(INFO) << "load channel removed" << " load_id=" << _load_id << ", is high priority=" << _is_high_priority << ", sender_ip=" << _sender_ip << rows_str.str(); diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 6fad8c536ec..6c150ed74d9 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -127,7 +127,6 @@ private: int64_t _backend_id; bool _enable_profile; - bool _need_release_memtracker = false; }; inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) { diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index faf354cca4c..251a7c25a74 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -123,9 +123,6 @@ public: bool is_query_cancelled() { return _is_query_cancelled; } void set_is_query_cancelled(bool is_cancelled) { _is_query_cancelled.store(is_cancelled); } - // Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove. - std::list<std::weak_ptr<MemTrackerLimiter>>::iterator wg_tracker_limiter_group_it; - /* * Part 3, Memory tracking method (use carefully!) * diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 497041ac17b..046de58fe5f 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -159,8 +159,6 @@ QueryContext::~QueryContext() { uint64_t group_id = 0; if (_workload_group) { group_id = _workload_group->id(); // before remove - _workload_group->remove_mem_tracker_limiter(query_mem_tracker); - _workload_group->remove_query(_query_id); } _exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id)); diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 329366766f8..4d458cd440f 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -446,7 +446,7 @@ void BaseTabletsChannel::refresh_profile() { { std::lock_guard<SpinLock> l(_tablet_writers_lock); for (auto&& [tablet_id, writer] : _tablet_writers) { - int64_t write_mem = writer->mem_consumption(MemType::WRITE); + int64_t write_mem = writer->mem_consumption(MemType::WRITE_FINISHED); write_mem_usage += write_mem; int64_t flush_mem = writer->mem_consumption(MemType::FLUSH); flush_mem_usage += flush_mem; diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 6f3b51f09fd..0488e9ec83c 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -144,21 +144,32 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) { } } +// MemtrackerLimiter is not removed during query context release, so that should remove it here. int64_t WorkloadGroup::make_memory_tracker_snapshots( std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots) { int64_t used_memory = 0; for (auto& mem_tracker_group : _mem_tracker_limiter_pool) { std::lock_guard<std::mutex> l(mem_tracker_group.group_lock); - for (const auto& trackerWptr : mem_tracker_group.trackers) { - auto tracker = trackerWptr.lock(); - CHECK(tracker != nullptr); - if (tracker_snapshots != nullptr) { - tracker_snapshots->insert(tracker_snapshots->end(), tracker); + for (auto trackerWptr = mem_tracker_group.trackers.begin(); + trackerWptr != mem_tracker_group.trackers.end();) { + auto tracker = trackerWptr->lock(); + if (tracker == nullptr) { + trackerWptr = mem_tracker_group.trackers.erase(trackerWptr); + } else { + if (tracker_snapshots != nullptr) { + tracker_snapshots->insert(tracker_snapshots->end(), tracker); + } + used_memory += tracker->consumption(); + ++trackerWptr; } - used_memory += tracker->consumption(); } } - refresh_memory(used_memory); + // refresh total memory used. + _total_mem_used = used_memory; + // reserve memory is recorded in the query mem tracker + // and _total_mem_used already contains all the current reserve memory. + // so after refreshing _total_mem_used, reset _wg_refresh_interval_memory_growth. + _wg_refresh_interval_memory_growth.store(0.0); _mem_used_status->set_value(used_memory); return used_memory; } @@ -167,35 +178,38 @@ int64_t WorkloadGroup::memory_used() { return make_memory_tracker_snapshots(nullptr); } -void WorkloadGroup::refresh_memory(int64_t used_memory) { - // refresh total memory used. - _total_mem_used = used_memory; - // reserve memory is recorded in the query mem tracker - // and _total_mem_used already contains all the current reserve memory. - // so after refreshing _total_mem_used, reset _wg_refresh_interval_memory_growth. - _wg_refresh_interval_memory_growth.store(0.0); -} +void WorkloadGroup::do_sweep() { + // Clear memtracker limiter that is registered during query or load. + for (auto& mem_tracker_group : _mem_tracker_limiter_pool) { + std::lock_guard<std::mutex> l(mem_tracker_group.group_lock); + for (auto trackerWptr = mem_tracker_group.trackers.begin(); + trackerWptr != mem_tracker_group.trackers.end();) { + auto tracker = trackerWptr->lock(); + if (tracker == nullptr) { + trackerWptr = mem_tracker_group.trackers.erase(trackerWptr); + } else { + ++trackerWptr; + } + } + } -void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) { + // Clear query context that is registered during query context ctor std::unique_lock<std::shared_mutex> wlock(_mutex); - auto group_num = mem_tracker_ptr->group_num(); - std::lock_guard<std::mutex> l(_mem_tracker_limiter_pool[group_num].group_lock); - mem_tracker_ptr->wg_tracker_limiter_group_it = - _mem_tracker_limiter_pool[group_num].trackers.insert( - _mem_tracker_limiter_pool[group_num].trackers.end(), mem_tracker_ptr); + for (auto iter = _query_ctxs.begin(); iter != _query_ctxs.end();) { + if (iter->second.lock() == nullptr) { + iter = _query_ctxs.erase(iter); + } else { + iter++; + } + } } -void WorkloadGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) { +void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) { std::unique_lock<std::shared_mutex> wlock(_mutex); auto group_num = mem_tracker_ptr->group_num(); std::lock_guard<std::mutex> l(_mem_tracker_limiter_pool[group_num].group_lock); - if (mem_tracker_ptr->wg_tracker_limiter_group_it != - _mem_tracker_limiter_pool[group_num].trackers.end()) { - _mem_tracker_limiter_pool[group_num].trackers.erase( - mem_tracker_ptr->wg_tracker_limiter_group_it); - mem_tracker_ptr->wg_tracker_limiter_group_it = - _mem_tracker_limiter_pool[group_num].trackers.end(); - } + _mem_tracker_limiter_pool[group_num].trackers.insert( + _mem_tracker_limiter_pool[group_num].trackers.end(), mem_tracker_ptr); } int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc) { @@ -230,14 +244,16 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption, const std::string& label) { return fmt::format( - "{} cancel top memory overcommit tracker <{}> consumption {}. details:{}, Execute " + "{} cancel top memory overcommit tracker <{}> consumption {}. details:{}, " + "Execute " "again after enough memory, details see be.INFO.", cancel_str, label, MemCounter::print_bytes(mem_consumption), GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str()); }; auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const std::string& label) { return fmt::format( - "{} cancel top memory used tracker <{}> consumption {}. details:{}, Execute again " + "{} cancel top memory used tracker <{}> consumption {}. details:{}, Execute " + "again " "after enough memory, details see be.INFO.", cancel_str, label, MemCounter::print_bytes(mem_consumption), GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str()); @@ -249,7 +265,8 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile, _id, _name, _memory_limit, used_memory, need_free_mem); Defer defer {[&]() { LOG(INFO) << fmt::format( - "[MemoryGC] work load group finished gc, id:{} name:{}, memory limit: {}, used: " + "[MemoryGC] work load group finished gc, id:{} name:{}, memory limit: {}, " + "used: " "{}, need_free_mem: {}, freed memory: {}.", _id, _name, _memory_limit, used_memory, need_free_mem, freed_mem); }}; @@ -542,7 +559,8 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e _cgroup_cpu_ctl->update_cpu_soft_limit( CgroupCpuCtl::cpu_soft_limit_default_value()); } else { - LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is illegal: " + LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is " + "illegal: " << cpu_hard_limit << ", gid=" << tg_id; } } else { diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 2fbb4dd3030..933c5afdb4e 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -89,7 +89,8 @@ public: std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots); // call make_memory_tracker_snapshots, so also refresh total memory used. int64_t memory_used(); - void refresh_memory(int64_t used_memory); + + void do_sweep(); int spill_threshold_low_water_mark() const { return _spill_low_watermark.load(std::memory_order_relaxed); @@ -132,8 +133,6 @@ public: void add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr); - void remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr); - // when mem_limit <=0 , it's an invalid value, then current group not participating in memory GC // because mem_limit is not a required property bool is_mem_limit_valid() { @@ -154,11 +153,6 @@ public: return Status::OK(); } - void remove_query(TUniqueId query_id) { - std::unique_lock<std::shared_mutex> wlock(_mutex); - _query_ctxs.erase(query_id); - } - void shutdown() { std::unique_lock<std::shared_mutex> wlock(_mutex); _is_shutdown = true; @@ -169,11 +163,6 @@ public: return _is_shutdown && _query_ctxs.empty(); } - int query_num() { - std::shared_lock<std::shared_mutex> r_lock(_mutex); - return _query_ctxs.size(); - } - int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool is_minor_gc); void upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env); diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 65a8e3685c8..003f07f1db0 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -136,6 +136,13 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i << ", before wg size=" << old_wg_size << ", after wg size=" << new_wg_size; } +void WorkloadGroupMgr::do_sweep() { + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + for (auto& [wg_id, wg] : _workload_groups) { + wg->do_sweep(); + } +} + struct WorkloadGroupMemInfo { int64_t total_mem_used = 0; std::list<std::shared_ptr<MemTrackerLimiter>> tracker_snapshots = diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index d8547c3383e..f76e98d2606 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -50,6 +50,8 @@ public: WorkloadGroupPtr get_task_group_by_id(uint64_t tg_id); + void do_sweep(); + void stop(); std::atomic<bool> _enable_cpu_hard_limit = false; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 06dad46e90c..8dc4d7bb3c2 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1669,17 +1669,13 @@ void PInternalService::reset_rpc_channel(google::protobuf::RpcController* contro void PInternalService::hand_shake(google::protobuf::RpcController* controller, const PHandShakeRequest* request, PHandShakeResponse* response, google::protobuf::Closure* done) { - bool ret = _light_work_pool.try_offer([request, response, done]() { - brpc::ClosureGuard closure_guard(done); - if (request->has_hello()) { - response->set_hello(request->hello()); - } - response->mutable_status()->set_status_code(0); - }); - if (!ret) { - offer_failed(response, done, _light_work_pool); - return; + // The light pool may be full. Handshake is used to check the connection state of brpc. + // Should not be interfered by the thread pool logic. + brpc::ClosureGuard closure_guard(done); + if (request->has_hello()) { + response->set_hello(request->hello()); } + response->mutable_status()->set_status_code(0); } constexpr char HttpProtocol[] = "http://"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org