This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a73f4dfdc1 [fix](memtracker) Fix scanner thread ending after fragment thread causing mem tracker null pointer #14143 a73f4dfdc1 is described below commit a73f4dfdc1dacb5d96a27163c1befa0d770cf348 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Thu Nov 10 15:42:53 2022 +0800 [fix](memtracker) Fix scanner thread ending after fragment thread causing mem tracker null pointer #14143 --- be/src/exec/tablet_sink.cpp | 8 ++++---- be/src/exec/tablet_sink.h | 4 ++-- be/src/olap/memtable.cpp | 2 +- be/src/olap/olap_server.cpp | 4 ++-- be/src/olap/storage_engine.cpp | 2 +- be/src/olap/storage_engine.h | 2 +- be/src/vec/exec/scan/scanner_scheduler.cpp | 2 +- be/src/vec/sink/vtablet_sink.cpp | 2 +- 8 files changed, 13 insertions(+), 13 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 0da26b8d42..94294385d6 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -46,7 +46,7 @@ namespace stream_load { NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int64_t node_id) : _parent(parent), _index_channel(index_channel), _node_id(node_id) { - _node_channel_tracker = std::make_unique<MemTracker>(fmt::format( + _node_channel_tracker = std::make_shared<MemTracker>(fmt::format( "NodeChannel:indexID={}:threadId={}", std::to_string(_index_channel->_index_id), thread_context()->get_thread_id())); } @@ -518,7 +518,7 @@ int NodeChannel::try_send_and_fetch_status(RuntimeState* state, void NodeChannel::try_send_batch(RuntimeState* state) { SCOPED_ATOMIC_TIMER(&_actual_consume_ns); SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker); AddBatchReq send_batch; { debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; @@ -833,7 +833,7 @@ Status OlapTableSink::prepare(RuntimeState* state) { // profile must add to state's object pool _profile = state->obj_pool()->add(new RuntimeProfile("OlapTableSink")); _mem_tracker = - std::make_unique<MemTracker>("OlapTableSink:" + std::to_string(state->load_job_id())); + std::make_shared<MemTracker>("OlapTableSink:" + std::to_string(state->load_job_id())); SCOPED_TIMER(_profile->total_time_counter()); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -1407,7 +1407,7 @@ Status OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitma void OlapTableSink::_send_batch_process(RuntimeState* state) { SCOPED_TIMER(_non_blocking_send_timer); SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); do { int running_channels_num = 0; for (auto index_channel : _channels) { diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 45552329bf..fb820d6c42 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -260,7 +260,7 @@ protected: std::string _load_info; std::string _name; - std::unique_ptr<MemTracker> _node_channel_tracker; + std::shared_ptr<MemTracker> _node_channel_tracker; TupleDescriptor* _tuple_desc = nullptr; NodeInfo _node_info; @@ -466,7 +466,7 @@ protected: bool _is_vectorized = false; - std::unique_ptr<MemTracker> _mem_tracker; + std::shared_ptr<MemTracker> _mem_tracker; ObjectPool* _pool; const RowDescriptor& _input_row_desc; diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 9faf757bf9..c89afe8169 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -434,7 +434,7 @@ Status MemTable::_generate_delete_bitmap() { } Status MemTable::flush() { - SCOPED_CONSUME_MEM_TRACKER(_flush_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(_flush_mem_tracker); VLOG_CRITICAL << "begin to flush memtable for tablet: " << tablet_id() << ", memsize: " << memory_usage() << ", rows: " << _rows; int64_t duration_ns = 0; diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index ead81f7367..0462ee6c64 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -125,7 +125,7 @@ Status StorageEngine::start_bg_threads() { RETURN_IF_ERROR(Thread::create( "StorageEngine", "path_scan_thread", [this, data_dir]() { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); this->_path_scan_thread_callback(data_dir); }, &path_scan_thread)); @@ -135,7 +135,7 @@ Status StorageEngine::start_bg_threads() { RETURN_IF_ERROR(Thread::create( "StorageEngine", "path_gc_thread", [this, data_dir]() { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); this->_path_gc_thread_callback(data_dir); }, &path_gc_thread)); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 083814fef0..5eab6fa5f3 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -106,7 +106,7 @@ StorageEngine::StorageEngine(const EngineOptions& options) _available_storage_medium_type_count(0), _effective_cluster_id(-1), _is_all_cluster_id_exist(true), - _mem_tracker(std::make_unique<MemTracker>("StorageEngine")), + _mem_tracker(std::make_shared<MemTracker>("StorageEngine")), _segcompaction_mem_tracker(std::make_unique<MemTracker>("SegCompaction")), _segment_meta_mem_tracker(std::make_unique<MemTracker>("SegmentMeta")), _stop_background_threads_latch(1), diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index ab7fa4ac7d..46607b241e 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -324,7 +324,7 @@ private: std::unordered_map<std::string, RowsetSharedPtr> _unused_rowsets; // StorageEngine oneself - std::unique_ptr<MemTracker> _mem_tracker; + std::shared_ptr<MemTracker> _mem_tracker; // Count the memory consumption of segment compaction tasks. std::unique_ptr<MemTracker> _segcompaction_mem_tracker; // This mem tracker is only for tracking memory use by segment meta data such as footer or index page. diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 09578a2ba1..e01405ecad 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -186,7 +186,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext INIT_AND_SCOPE_REENTRANT_SPAN_IF(ctx->state()->enable_profile(), ctx->state()->get_tracer(), ctx->scan_span(), "VScanner::scan"); SCOPED_ATTACH_TASK(scanner->runtime_state()); - SCOPED_CONSUME_MEM_TRACKER(scanner->runtime_state()->scanner_mem_tracker().get()); + SCOPED_CONSUME_MEM_TRACKER(scanner->runtime_state()->scanner_mem_tracker()); Thread::set_self_name("_scanner_scan"); scanner->update_wait_worker_timer(); // Do not use ScopedTimer. There is no guarantee that, the counter diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 04ce7c54dc..a6e4a96566 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -247,7 +247,7 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState* state, void VNodeChannel::try_send_block(RuntimeState* state) { SCOPED_ATTACH_TASK(state); - SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); + SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker); SCOPED_ATOMIC_TIMER(&_actual_consume_ns); AddBlockReq send_block; { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org