This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 2b965303d8f [branch-3.0] pick some bugfix (#42128)
2b965303d8f is described below

commit 2b965303d8f92ffa66609264c300da1c71879242
Author: yiguolei <676222...@qq.com>
AuthorDate: Sat Oct 19 18:19:28 2024 +0800

    [branch-3.0] pick some bugfix (#42128)
---
 be/src/common/config.cpp                           |  2 -
 be/src/common/config.h                             |  2 -
 be/src/common/daemon.cpp                           |  1 +
 be/src/olap/memtable.cpp                           | 40 +++++------
 be/src/olap/memtable.h                             | 37 +++++-----
 be/src/olap/memtable_flush_executor.cpp            | 24 +++----
 be/src/olap/memtable_flush_executor.h              |  4 +-
 be/src/olap/memtable_memory_limiter.cpp            |  4 +-
 be/src/olap/memtable_writer.cpp                    | 40 ++++-------
 be/src/olap/memtable_writer.h                      | 11 ++-
 be/src/olap/rowset/segment_v2/segment.cpp          | 57 ++++++++-------
 be/src/olap/rowset/segment_v2/segment.h            | 10 ++-
 be/src/olap/rowset/segment_v2/segment_iterator.cpp |  6 +-
 be/src/olap/segment_loader.cpp                     |  8 ++-
 be/src/olap/segment_loader.h                       | 12 ++++
 be/src/runtime/load_channel.cpp                    |  7 --
 be/src/runtime/load_channel.h                      |  1 -
 be/src/runtime/memory/mem_tracker_limiter.h        |  3 -
 be/src/runtime/query_context.cpp                   |  2 -
 be/src/runtime/tablets_channel.cpp                 |  2 +-
 be/src/runtime/workload_group/workload_group.cpp   | 84 +++++++++++++---------
 be/src/runtime/workload_group/workload_group.h     | 15 +---
 .../workload_group/workload_group_manager.cpp      |  7 ++
 .../workload_group/workload_group_manager.h        |  2 +
 be/src/service/internal_service.cpp                | 16 ++---
 25 files changed, 199 insertions(+), 198 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 6e459b3638b..4aea6200ce0 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -626,8 +626,6 @@ DEFINE_mInt32(memtable_hard_limit_active_percent, "50");
 // percent of (active memtables size / all memtables size) when reach soft 
limit
 DEFINE_mInt32(memtable_soft_limit_active_percent, "50");
 
-// memtable insert memory tracker will multiply input block size with this 
ratio
-DEFINE_mDouble(memtable_insert_memory_ratio, "1.4");
 // max write buffer size before flush, default 200MB
 DEFINE_mInt64(write_buffer_size, "209715200");
 // max buffer size used in memtable for the aggregated table, default 400MB
diff --git a/be/src/common/config.h b/be/src/common/config.h
index dc1f60b69f8..13437d96b8e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -674,8 +674,6 @@ DECLARE_mInt32(memtable_hard_limit_active_percent);
 // percent of (active memtables size / all memtables size) when reach soft 
limit
 DECLARE_mInt32(memtable_soft_limit_active_percent);
 
-// memtable insert memory tracker will multiply input block size with this 
ratio
-DECLARE_mDouble(memtable_insert_memory_ratio);
 // max write buffer size before flush, default 200MB
 DECLARE_mInt64(write_buffer_size);
 // max buffer size used in memtable for the aggregated table, default 400MB
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 5da49758865..27fbfb71d7f 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -296,6 +296,7 @@ void Daemon::memory_maintenance_thread() {
         // TODO replace memory_gc_thread.
 
         // step 6. Refresh weighted memory ratio of workload groups.
+        doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep();
         
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();
 
         // step 7. Analyze blocking queries.
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 0040e00ffc9..a70486e39b3 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -50,20 +50,16 @@ using namespace ErrorCode;
 
 MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> 
tablet_schema,
                    const std::vector<SlotDescriptor*>* slot_descs, 
TupleDescriptor* tuple_desc,
-                   bool enable_unique_key_mow, PartialUpdateInfo* 
partial_update_info,
-                   const std::shared_ptr<MemTracker>& insert_mem_tracker,
-                   const std::shared_ptr<MemTracker>& flush_mem_tracker)
-        : _tablet_id(tablet_id),
+                   bool enable_unique_key_mow, PartialUpdateInfo* 
partial_update_info)
+        : _mem_type(MemType::ACTIVE),
+          _tablet_id(tablet_id),
           _enable_unique_key_mow(enable_unique_key_mow),
           _keys_type(tablet_schema->keys_type()),
           _tablet_schema(tablet_schema),
-          _insert_mem_tracker(insert_mem_tracker),
-          _flush_mem_tracker(flush_mem_tracker),
           _is_first_insertion(true),
           _agg_functions(tablet_schema->num_columns()),
           _offsets_of_aggregate_states(tablet_schema->num_columns()),
-          _total_size_of_aggregate_states(0),
-          _mem_usage(0) {
+          _total_size_of_aggregate_states(0) {
     g_memtable_cnt << 1;
     _query_thread_context.init_unlocked();
     _arena = std::make_unique<vectorized::Arena>();
@@ -82,6 +78,7 @@ MemTable::MemTable(int64_t tablet_id, 
std::shared_ptr<TabletSchema> tablet_schem
     }
     // TODO: Support ZOrderComparator in the future
     _init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
+    _mem_tracker = std::make_shared<MemTracker>();
 }
 
 void MemTable::_init_columns_offset_by_slot_descs(const 
std::vector<SlotDescriptor*>* slot_descs,
@@ -142,6 +139,13 @@ void MemTable::_init_agg_functions(const 
vectorized::Block* block) {
 
 MemTable::~MemTable() {
     
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
+    if (_is_flush_success) {
+        // If the memtable is flush success, then its memtracker's consumption 
should be 0
+        if (_mem_tracker->consumption() != 0 && 
config::crash_in_memory_tracker_inaccurate) {
+            LOG(FATAL) << "memtable flush success but cosumption is not 0, it 
is "
+                       << _mem_tracker->consumption();
+        }
+    }
     g_memtable_input_block_allocated_size << 
-_input_mutable_block.allocated_bytes();
     g_memtable_cnt << -1;
     if (_keys_type != KeysType::DUP_KEYS) {
@@ -159,13 +163,7 @@ MemTable::~MemTable() {
         }
     }
     std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), 
std::default_delete<RowInBlock>());
-    _insert_mem_tracker->release(_mem_usage);
-    _flush_mem_tracker->set_consumption(0);
-    DCHECK_EQ(_insert_mem_tracker->consumption(), 0) << std::endl
-                                                     << 
_insert_mem_tracker->log_usage();
-    DCHECK_EQ(_flush_mem_tracker->consumption(), 0);
     _arena.reset();
-    _agg_buffer_pool.clear();
     _vec_row_comparator.reset();
     _row_in_blocks.clear();
     _agg_functions.clear();
@@ -180,6 +178,7 @@ int RowInBlockComparator::operator()(const RowInBlock* 
left, const RowInBlock* r
 
 Status MemTable::insert(const vectorized::Block* input_block,
                         const std::vector<uint32_t>& row_idxs) {
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     if (_is_first_insertion) {
         _is_first_insertion = false;
         auto clone_block = input_block->clone_without_columns(&_column_offset);
@@ -214,10 +213,6 @@ Status MemTable::insert(const vectorized::Block* 
input_block,
                                                   row_idxs.data() + num_rows, 
&_column_offset));
     auto block_size1 = _input_mutable_block.allocated_bytes();
     g_memtable_input_block_allocated_size << block_size1 - block_size0;
-    auto input_size = size_t(input_block->bytes() * num_rows / 
input_block->rows() *
-                             config::memtable_insert_memory_ratio);
-    _mem_usage += input_size;
-    _insert_mem_tracker->consume(input_size);
     for (int i = 0; i < num_rows; i++) {
         _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + 
i});
     }
@@ -467,10 +462,6 @@ void MemTable::_aggregate() {
     }
     if constexpr (!is_final) {
         // if is not final, we collect the agg results to input_block and then 
continue to insert
-        size_t shrunked_after_agg = _output_mutable_block.allocated_bytes();
-        // flush will not run here, so will not duplicate `_flush_mem_tracker`
-        _insert_mem_tracker->consume(shrunked_after_agg - _mem_usage);
-        _mem_usage = shrunked_after_agg;
         _input_mutable_block.swap(_output_mutable_block);
         //TODO(weixang):opt here.
         std::unique_ptr<vectorized::Block> empty_input_block = 
in_block.create_same_struct_block(0);
@@ -483,6 +474,7 @@ void MemTable::_aggregate() {
 }
 
 void MemTable::shrink_memtable_by_agg() {
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     if (_keys_type == KeysType::DUP_KEYS) {
         return;
     }
@@ -532,8 +524,8 @@ Status 
MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
     }
     g_memtable_input_block_allocated_size << 
-_input_mutable_block.allocated_bytes();
     _input_mutable_block.clear();
-    _insert_mem_tracker->release(_mem_usage);
-    _mem_usage = 0;
+    // After to block, all data in arena is saved in the block
+    _arena.reset();
     *res = vectorized::Block::create_unique(_output_mutable_block.to_block());
     return Status::OK();
 }
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 70f7a9f22a0..4ae92c2d2d8 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -47,6 +47,11 @@ class TabletSchema;
 class TupleDescriptor;
 enum KeysType : int;
 
+// Active: the memtable is currently used by writer to insert into blocks
+// Write_finished: the memtable finished write blocks and in the queue waiting 
for flush
+// FLUSH: the memtable is under flushing, write segment to disk.
+enum MemType { ACTIVE = 0, WRITE_FINISHED = 1, FLUSH = 2 };
+
 // row pos in _input_mutable_block
 struct RowInBlock {
     size_t _row_pos;
@@ -171,16 +176,11 @@ class MemTable {
 public:
     MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema,
              const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* 
tuple_desc,
-             bool enable_unique_key_mow, PartialUpdateInfo* 
partial_update_info,
-             const std::shared_ptr<MemTracker>& insert_mem_tracker,
-             const std::shared_ptr<MemTracker>& flush_mem_tracker);
+             bool enable_unique_key_mow, PartialUpdateInfo* 
partial_update_info);
     ~MemTable();
 
     int64_t tablet_id() const { return _tablet_id; }
-    size_t memory_usage() const {
-        return _insert_mem_tracker->consumption() + _arena->used_size() +
-               _flush_mem_tracker->consumption();
-    }
+    size_t memory_usage() const { return _mem_tracker->consumption(); }
     // insert tuple from (row_pos) to (row_pos+num_rows)
     Status insert(const vectorized::Block* block, const std::vector<uint32_t>& 
row_idxs);
 
@@ -196,10 +196,16 @@ public:
 
     const MemTableStat& stat() { return _stat; }
 
-    std::shared_ptr<MemTracker> flush_mem_tracker() { return 
_flush_mem_tracker; }
-
     QueryThreadContext query_thread_context() { return _query_thread_context; }
 
+    std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }
+
+    void set_flush_success() { _is_flush_success = true; }
+
+    MemType get_mem_type() { return _mem_type; }
+
+    void update_mem_type(MemType memtype) { _mem_type = memtype; }
+
 private:
     // for vectorized
     void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, 
RowInBlock* new_row,
@@ -209,9 +215,11 @@ private:
     Status _to_block(std::unique_ptr<vectorized::Block>* res);
 
 private:
+    std::atomic<MemType> _mem_type;
     int64_t _tablet_id;
     bool _enable_unique_key_mow = false;
     bool _is_partial_update = false;
+    bool _is_flush_success = false;
     const KeysType _keys_type;
     std::shared_ptr<TabletSchema> _tablet_schema;
 
@@ -219,18 +227,11 @@ private:
 
     QueryThreadContext _query_thread_context;
 
-    // `_insert_manual_mem_tracker` manually records the memory value of 
memtable insert()
-    // `_flush_hook_mem_tracker` automatically records the memory value of 
memtable flush() through mem hook.
-    // Is used to flush when _insert_manual_mem_tracker larger than 
write_buffer_size and run flush memtable
-    // when the sum of all memtable (_insert_manual_mem_tracker + 
_flush_hook_mem_tracker) exceeds the limit.
-    std::shared_ptr<MemTracker> _insert_mem_tracker;
-    std::shared_ptr<MemTracker> _flush_mem_tracker;
+    std::shared_ptr<MemTracker> _mem_tracker;
     // Only the rows will be inserted into block can allocate memory from 
_arena.
     // In this way, we can make MemTable::memory_usage() to be more accurate, 
and eventually
     // reduce the number of segment files that are generated by current load
     std::unique_ptr<vectorized::Arena> _arena;
-    // The object buffer pool for convert tuple to row
-    ObjectPool _agg_buffer_pool;
 
     void _init_columns_offset_by_slot_descs(const 
std::vector<SlotDescriptor*>* slot_descs,
                                             const TupleDescriptor* tuple_desc);
@@ -264,8 +265,6 @@ private:
     std::vector<size_t> _offsets_of_aggregate_states;
     size_t _total_size_of_aggregate_states;
     std::vector<RowInBlock*> _row_in_blocks;
-    // Memory usage without _arena.
-    size_t _mem_usage;
 
     size_t _num_columns;
     int32_t _seq_col_idx_in_block = -1;
diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index 887340eed70..dc911647be8 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -46,10 +46,10 @@ class MemtableFlushTask final : public Runnable {
     ENABLE_FACTORY_CREATOR(MemtableFlushTask);
 
 public:
-    MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, 
std::unique_ptr<MemTable> memtable,
+    MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, 
std::shared_ptr<MemTable> memtable,
                       int32_t segment_id, int64_t submit_task_time)
             : _flush_token(flush_token),
-              _memtable(std::move(memtable)),
+              _memtable(memtable),
               _segment_id(segment_id),
               _submit_task_time(submit_task_time) {
         g_flush_task_num << 1;
@@ -60,7 +60,7 @@ public:
     void run() override {
         auto token = _flush_token.lock();
         if (token) {
-            token->_flush_memtable(std::move(_memtable), _segment_id, 
_submit_task_time);
+            token->_flush_memtable(_memtable, _segment_id, _submit_task_time);
         } else {
             LOG(WARNING) << "flush token is deconstructed, ignore the flush 
task";
         }
@@ -68,7 +68,7 @@ public:
 
 private:
     std::weak_ptr<FlushToken> _flush_token;
-    std::unique_ptr<MemTable> _memtable;
+    std::shared_ptr<MemTable> _memtable;
     int32_t _segment_id;
     int64_t _submit_task_time;
 };
@@ -83,7 +83,7 @@ std::ostream& operator<<(std::ostream& os, const 
FlushStatistic& stat) {
     return os;
 }
 
-Status FlushToken::submit(std::unique_ptr<MemTable> mem_table) {
+Status FlushToken::submit(std::shared_ptr<MemTable> mem_table) {
     {
         std::shared_lock rdlk(_flush_status_lock);
         DBUG_EXECUTE_IF("FlushToken.submit_flush_error", {
@@ -98,9 +98,8 @@ Status FlushToken::submit(std::unique_ptr<MemTable> 
mem_table) {
         return Status::OK();
     }
     int64_t submit_task_time = MonotonicNanos();
-    auto task = MemtableFlushTask::create_shared(shared_from_this(), 
std::move(mem_table),
-                                                 
_rowset_writer->allocate_segment_id(),
-                                                 submit_task_time);
+    auto task = MemtableFlushTask::create_shared(
+            shared_from_this(), mem_table, 
_rowset_writer->allocate_segment_id(), submit_task_time);
     Status ret = _thread_pool->submit(std::move(task));
     if (ret.ok()) {
         // _wait_running_task_finish was executed after this function, so no 
need to notify _cond here
@@ -136,20 +135,19 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, 
int32_t segment_id, in
     VLOG_CRITICAL << "begin to flush memtable for tablet: " << 
memtable->tablet_id()
                   << ", memsize: " << memtable->memory_usage()
                   << ", rows: " << memtable->stat().raw_rows;
+    memtable->update_mem_type(MemType::FLUSH);
     int64_t duration_ns;
     SCOPED_RAW_TIMER(&duration_ns);
     SCOPED_ATTACH_TASK(memtable->query_thread_context());
     signal::set_signal_task_id(_rowset_writer->load_id());
     signal::tablet_id = memtable->tablet_id();
     {
+        SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
         std::unique_ptr<vectorized::Block> block;
-        // During to block method, it will release old memory and create new 
block, so that
-        // we could not scoped it.
         RETURN_IF_ERROR(memtable->to_block(&block));
-        memtable->flush_mem_tracker()->consume(block->allocated_bytes());
-        SCOPED_CONSUME_MEM_TRACKER(memtable->flush_mem_tracker());
         RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), 
segment_id, flush_size));
     }
+    memtable->set_flush_success();
     _memtable_stat += memtable->stat();
     DorisMetrics::instance()->memtable_flush_total->increment(1);
     
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 
1000);
@@ -158,7 +156,7 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, 
int32_t segment_id, in
     return Status::OK();
 }
 
-void FlushToken::_flush_memtable(std::unique_ptr<MemTable> memtable_ptr, 
int32_t segment_id,
+void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, 
int32_t segment_id,
                                  int64_t submit_task_time) {
     Defer defer {[&]() {
         std::lock_guard<std::mutex> lock(_mutex);
diff --git a/be/src/olap/memtable_flush_executor.h 
b/be/src/olap/memtable_flush_executor.h
index 2d20298f800..25c5a37afba 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -61,7 +61,7 @@ class FlushToken : public 
std::enable_shared_from_this<FlushToken> {
 public:
     FlushToken(ThreadPool* thread_pool) : _flush_status(Status::OK()), 
_thread_pool(thread_pool) {}
 
-    Status submit(std::unique_ptr<MemTable> mem_table);
+    Status submit(std::shared_ptr<MemTable> mem_table);
 
     // error has happens, so we cancel this token
     // And remove all tasks in the queue.
@@ -87,7 +87,7 @@ private:
 private:
     friend class MemtableFlushTask;
 
-    void _flush_memtable(std::unique_ptr<MemTable> memtable_ptr, int32_t 
segment_id,
+    void _flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t 
segment_id,
                          int64_t submit_task_time);
 
     Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* 
flush_size);
diff --git a/be/src/olap/memtable_memory_limiter.cpp 
b/be/src/olap/memtable_memory_limiter.cpp
index ea045b1e53e..9b9ce19f895 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -20,6 +20,7 @@
 #include <bvar/bvar.h>
 
 #include "common/config.h"
+#include "olap/memtable.h"
 #include "olap/memtable_writer.h"
 #include "util/doris_metrics.h"
 #include "util/mem_info.h"
@@ -237,13 +238,14 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() {
     _active_writers.clear();
     for (auto it = _writers.begin(); it != _writers.end();) {
         if (auto writer = it->lock()) {
+            // The memtable is currently used by writer to insert blocks.
             auto active_usage = writer->active_memtable_mem_consumption();
             _active_mem_usage += active_usage;
             if (active_usage > 0) {
                 _active_writers.push_back(writer);
             }
             _flush_mem_usage += writer->mem_consumption(MemType::FLUSH);
-            _write_mem_usage += writer->mem_consumption(MemType::WRITE);
+            _write_mem_usage += 
writer->mem_consumption(MemType::WRITE_FINISHED);
             ++it;
         } else {
             *it = std::move(_writers.back());
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 59916d5f1cc..e8123c48ecc 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -133,12 +133,18 @@ Status MemTableWriter::write(const vectorized::Block* 
block,
 
 Status MemTableWriter::_flush_memtable_async() {
     DCHECK(_flush_token != nullptr);
-    std::unique_ptr<MemTable> memtable;
+    std::shared_ptr<MemTable> memtable;
     {
         std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
-        memtable = std::move(_mem_table);
+        memtable = _mem_table;
+        _mem_table = nullptr;
     }
-    return _flush_token->submit(std::move(memtable));
+    {
+        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
+        memtable->update_mem_type(MemType::WRITE_FINISHED);
+        _freezed_mem_tables.push_back(memtable);
+    }
+    return _flush_token->submit(memtable);
 }
 
 Status MemTableWriter::flush_async() {
@@ -187,22 +193,10 @@ Status MemTableWriter::wait_flush() {
 }
 
 void MemTableWriter::_reset_mem_table() {
-    auto mem_table_insert_tracker = std::make_shared<MemTracker>(fmt::format(
-            "MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
-            std::to_string(tablet_id()), _mem_table_num, 
UniqueId(_req.load_id).to_string()));
-    auto mem_table_flush_tracker = std::make_shared<MemTracker>(fmt::format(
-            "MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", 
std::to_string(tablet_id()),
-            _mem_table_num++, UniqueId(_req.load_id).to_string()));
-    {
-        std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
-        _mem_table_insert_trackers.push_back(mem_table_insert_tracker);
-        _mem_table_flush_trackers.push_back(mem_table_flush_tracker);
-    }
     {
         std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
         _mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema, 
_req.slots, _req.tuple_desc,
-                                      _unique_key_mow, 
_partial_update_info.get(),
-                                      mem_table_insert_tracker, 
mem_table_flush_tracker));
+                                      _unique_key_mow, 
_partial_update_info.get()));
     }
 
     _segment_num++;
@@ -353,15 +347,11 @@ int64_t MemTableWriter::mem_consumption(MemType mem) {
     }
     int64_t mem_usage = 0;
     {
-        std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
-        if ((mem & MemType::WRITE) == MemType::WRITE) { // 3 & 2 = 2
-            for (const auto& mem_table_tracker : _mem_table_insert_trackers) {
-                mem_usage += mem_table_tracker->consumption();
-            }
-        }
-        if ((mem & MemType::FLUSH) == MemType::FLUSH) { // 3 & 1 = 1
-            for (const auto& mem_table_tracker : _mem_table_flush_trackers) {
-                mem_usage += mem_table_tracker->consumption();
+        std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
+        for (const auto& mem_table : _freezed_mem_tables) {
+            auto mem_table_sptr = mem_table.lock();
+            if (mem_table_sptr != nullptr && mem_table_sptr->get_mem_type() == 
mem) {
+                mem_usage += mem_table_sptr->memory_usage();
             }
         }
     }
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index ee7c8e1538a..ec44348b4a9 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -57,8 +57,6 @@ namespace vectorized {
 class Block;
 } // namespace vectorized
 
-enum MemType { WRITE = 1, FLUSH = 2, ALL = 3 };
-
 // Writer for a particular (load, index, tablet).
 // This class is NOT thread-safe, external synchronization is required.
 class MemTableWriter {
@@ -123,18 +121,17 @@ private:
     Status _cancel_status;
     WriteRequest _req;
     std::shared_ptr<RowsetWriter> _rowset_writer;
-    std::unique_ptr<MemTable> _mem_table;
+    std::shared_ptr<MemTable> _mem_table;
     TabletSchemaSPtr _tablet_schema;
     bool _unique_key_mow = false;
 
     // This variable is accessed from writer thread and token flush thread
     // use a shared ptr to avoid use after free problem.
     std::shared_ptr<FlushToken> _flush_token;
-    std::vector<std::shared_ptr<MemTracker>> _mem_table_insert_trackers;
-    std::vector<std::shared_ptr<MemTracker>> _mem_table_flush_trackers;
-    SpinLock _mem_table_tracker_lock;
+    // Save the not active memtable that is in flush queue or under flushing.
+    std::vector<std::weak_ptr<MemTable>> _freezed_mem_tables;
+    // The lock to protect _memtable and _freezed_mem_tables structure to 
avoid concurrency modification or read
     SpinLock _mem_table_ptr_lock;
-    std::atomic<uint32_t> _mem_table_num = 1;
     QueryThreadContext _query_thread_context;
 
     std::mutex _lock;
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index f0710a5e2ba..11457a7a332 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -450,25 +450,12 @@ Status Segment::_load_pk_bloom_filter() {
     DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS);
     DCHECK(_pk_index_meta != nullptr);
     DCHECK(_pk_index_reader != nullptr);
-    auto status = [this]() {
-        return _load_pk_bf_once.call([this] {
-            RETURN_IF_ERROR(_pk_index_reader->parse_bf(_file_reader, 
*_pk_index_meta));
-            // _meta_mem_usage += _pk_index_reader->get_bf_memory_size();
-            return Status::OK();
-        });
-    }();
-    if (!status.ok()) {
-        remove_from_segment_cache();
-    }
-    return status;
-}
 
-void Segment::remove_from_segment_cache() const {
-    if (config::disable_segment_cache) {
-        return;
-    }
-    SegmentCache::CacheKey cache_key(_rowset_id, _segment_id);
-    SegmentLoader::instance()->erase_segment(cache_key);
+    return _load_pk_bf_once.call([this] {
+        RETURN_IF_ERROR(_pk_index_reader->parse_bf(_file_reader, 
*_pk_index_meta));
+        // _meta_mem_usage += _pk_index_reader->get_bf_memory_size();
+        return Status::OK();
+    });
 }
 
 Status Segment::load_pk_index_and_bf() {
@@ -478,14 +465,6 @@ Status Segment::load_pk_index_and_bf() {
 }
 
 Status Segment::load_index() {
-    auto status = [this]() { return _load_index_impl(); }();
-    if (!status.ok()) {
-        remove_from_segment_cache();
-    }
-    return status;
-}
-
-Status Segment::_load_index_impl() {
     return _load_index_once.call([this] {
         if (_tablet_schema->keys_type() == UNIQUE_KEYS && _pk_index_meta != 
nullptr) {
             _pk_index_reader = std::make_unique<PrimaryKeyIndexReader>();
@@ -519,6 +498,32 @@ Status Segment::_load_index_impl() {
     });
 }
 
+Status Segment::healthy_status() {
+    try {
+        if (_load_index_once.has_called()) {
+            RETURN_IF_ERROR(_load_index_once.stored_result());
+        }
+        if (_load_pk_bf_once.has_called()) {
+            RETURN_IF_ERROR(_load_pk_bf_once.stored_result());
+        }
+        if (_create_column_readers_once_call.has_called()) {
+            RETURN_IF_ERROR(_create_column_readers_once_call.stored_result());
+        }
+        if (_inverted_index_file_reader_open.has_called()) {
+            RETURN_IF_ERROR(_inverted_index_file_reader_open.stored_result());
+        }
+        // This status is set by running time, for example, if there is 
something wrong during read segment iterator.
+        return _healthy_status.status();
+    } catch (const doris::Exception& e) {
+        // If there is an exception during load_xxx, should not throw 
exception directly because
+        // the caller may not exception safe.
+        return e.to_status();
+    } catch (const std::exception& e) {
+        // The exception is not thrown by doris code.
+        return Status::InternalError("Unexcepted error during load segment: 
{}", e.what());
+    }
+}
+
 // Return the storage datatype of related column to field.
 // Return nullptr meaning no such storage infomation for this column
 vectorized::DataTypePtr Segment::get_data_type_of(const ColumnIdentifier& 
identifier,
diff --git a/be/src/olap/rowset/segment_v2/segment.h 
b/be/src/olap/rowset/segment_v2/segment.h
index a4f01873f4c..035860b9bc9 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -142,6 +142,12 @@ public:
 
     Status load_pk_index_and_bf();
 
+    void update_healthy_status(Status new_status) { 
_healthy_status.update(new_status); }
+    // The segment is loaded into SegmentCache and then will load indices, if 
there are something wrong
+    // during loading indices, should remove it from SegmentCache. If not, it 
will always report error during
+    // query. So we add a healthy status API, the caller should check the 
healhty status before using the segment.
+    Status healthy_status();
+
     std::string min_key() {
         DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _pk_index_meta != 
nullptr);
         return _pk_index_meta->min_key();
@@ -155,8 +161,6 @@ public:
 
     int64_t meta_mem_usage() const { return _meta_mem_usage; }
 
-    void remove_from_segment_cache() const;
-
     // Identify the column by unique id or path info
     struct ColumnIdentifier {
         int32_t unique_id = -1;
@@ -222,7 +226,6 @@ private:
     Status _write_error_file(size_t file_size, size_t offset, size_t 
bytes_read, char* data,
                              io::IOContext& io_ctx);
 
-    Status _load_index_impl();
     Status _open_inverted_index();
 
     Status _create_column_readers_once();
@@ -233,6 +236,7 @@ private:
     io::FileReaderSPtr _file_reader;
     uint32_t _segment_id;
     uint32_t _num_rows;
+    AtomicStatus _healthy_status;
 
     // 1. Tracking memory use by segment meta data such as footer or index 
page.
     // 2. Tracking memory use by segment column reader
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 3d8e06bbc00..04ec5830d28 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -270,8 +270,8 @@ SegmentIterator::SegmentIterator(std::shared_ptr<Segment> 
segment, SchemaSPtr sc
 
 Status SegmentIterator::init(const StorageReadOptions& opts) {
     auto status = _init_impl(opts);
-    if (!status.ok() && !config::disable_segment_cache) {
-        _segment->remove_from_segment_cache();
+    if (!status.ok()) {
+        _segment->update_healthy_status(status);
     }
     return status;
 }
@@ -1925,7 +1925,7 @@ Status SegmentIterator::next_batch(vectorized::Block* 
block) {
 
     // if rows read by batch is 0, will return end of file, we should not 
remove segment cache in this situation.
     if (!status.ok() && !status.is<END_OF_FILE>()) {
-        _segment->remove_from_segment_cache();
+        _segment->update_healthy_status(status);
     }
     return status;
 }
diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp
index fd7e3f476ad..abc82c6f3ee 100644
--- a/be/src/olap/segment_loader.cpp
+++ b/be/src/olap/segment_loader.cpp
@@ -59,8 +59,14 @@ Status SegmentLoader::load_segments(const 
BetaRowsetSharedPtr& rowset,
     for (int64_t i = 0; i < rowset->num_segments(); i++) {
         SegmentCache::CacheKey cache_key(rowset->rowset_id(), i);
         if (_segment_cache->lookup(cache_key, cache_handle)) {
-            continue;
+            // Has to check the segment status here, because the segment in 
cache may has something wrong during
+            // load index or create column reader.
+            // Not merge this if logic with previous to make the logic more 
clear.
+            if (cache_handle->pop_unhealthy_segment() == nullptr) {
+                continue;
+            }
         }
+        // If the segment is not healthy, then will create a new segment and 
will replace the unhealthy one in SegmentCache.
         segment_v2::SegmentSharedPtr segment;
         RETURN_IF_ERROR(rowset->load_segment(i, &segment));
         if (need_load_pk_index_and_bf) {
diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h
index d177024242d..b3b88fa7700 100644
--- a/be/src/olap/segment_loader.h
+++ b/be/src/olap/segment_loader.h
@@ -161,6 +161,18 @@ public:
         _init = true;
     }
 
+    segment_v2::SegmentSharedPtr pop_unhealthy_segment() {
+        if (segments.empty()) {
+            return nullptr;
+        }
+        segment_v2::SegmentSharedPtr last_segment = segments.back();
+        if (last_segment->healthy_status().ok()) {
+            return nullptr;
+        }
+        segments.pop_back();
+        return last_segment;
+    }
+
 private:
     std::vector<segment_v2::SegmentSharedPtr> segments;
     bool _init {false};
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index f8c11639719..1ac7753b197 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -64,7 +64,6 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t 
timeout_s, bool is_hig
             if (workload_group_ptr) {
                 wg_ptr = workload_group_ptr;
                 wg_ptr->add_mem_tracker_limiter(mem_tracker);
-                _need_release_memtracker = true;
             }
         }
     }
@@ -85,12 +84,6 @@ LoadChannel::~LoadChannel() {
         rows_str << ", index id: " << entry.first << ", total_received_rows: " 
<< entry.second.first
                  << ", num_rows_filtered: " << entry.second.second;
     }
-    if (_need_release_memtracker) {
-        WorkloadGroupPtr wg_ptr = 
_query_thread_context.get_workload_group_ptr();
-        if (wg_ptr) {
-            
wg_ptr->remove_mem_tracker_limiter(_query_thread_context.get_memory_tracker());
-        }
-    }
     LOG(INFO) << "load channel removed"
               << " load_id=" << _load_id << ", is high priority=" << 
_is_high_priority
               << ", sender_ip=" << _sender_ip << rows_str.str();
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 6fad8c536ec..6c150ed74d9 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -127,7 +127,6 @@ private:
     int64_t _backend_id;
 
     bool _enable_profile;
-    bool _need_release_memtracker = false;
 };
 
 inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index faf354cca4c..251a7c25a74 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -123,9 +123,6 @@ public:
     bool is_query_cancelled() { return _is_query_cancelled; }
     void set_is_query_cancelled(bool is_cancelled) { 
_is_query_cancelled.store(is_cancelled); }
 
-    // Iterator into mem_tracker_limiter_pool for this object. Stored to have 
O(1) remove.
-    std::list<std::weak_ptr<MemTrackerLimiter>>::iterator 
wg_tracker_limiter_group_it;
-
     /*
     * Part 3, Memory tracking method (use carefully!)
     *
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 497041ac17b..046de58fe5f 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -159,8 +159,6 @@ QueryContext::~QueryContext() {
     uint64_t group_id = 0;
     if (_workload_group) {
         group_id = _workload_group->id(); // before remove
-        _workload_group->remove_mem_tracker_limiter(query_mem_tracker);
-        _workload_group->remove_query(_query_id);
     }
 
     
_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 329366766f8..4d458cd440f 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -446,7 +446,7 @@ void BaseTabletsChannel::refresh_profile() {
     {
         std::lock_guard<SpinLock> l(_tablet_writers_lock);
         for (auto&& [tablet_id, writer] : _tablet_writers) {
-            int64_t write_mem = writer->mem_consumption(MemType::WRITE);
+            int64_t write_mem = 
writer->mem_consumption(MemType::WRITE_FINISHED);
             write_mem_usage += write_mem;
             int64_t flush_mem = writer->mem_consumption(MemType::FLUSH);
             flush_mem_usage += flush_mem;
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 6f3b51f09fd..0488e9ec83c 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -144,21 +144,32 @@ void WorkloadGroup::check_and_update(const 
WorkloadGroupInfo& tg_info) {
     }
 }
 
+// MemtrackerLimiter is not removed during query context release, so that 
should remove it here.
 int64_t WorkloadGroup::make_memory_tracker_snapshots(
         std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots) {
     int64_t used_memory = 0;
     for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
         std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
-        for (const auto& trackerWptr : mem_tracker_group.trackers) {
-            auto tracker = trackerWptr.lock();
-            CHECK(tracker != nullptr);
-            if (tracker_snapshots != nullptr) {
-                tracker_snapshots->insert(tracker_snapshots->end(), tracker);
+        for (auto trackerWptr = mem_tracker_group.trackers.begin();
+             trackerWptr != mem_tracker_group.trackers.end();) {
+            auto tracker = trackerWptr->lock();
+            if (tracker == nullptr) {
+                trackerWptr = mem_tracker_group.trackers.erase(trackerWptr);
+            } else {
+                if (tracker_snapshots != nullptr) {
+                    tracker_snapshots->insert(tracker_snapshots->end(), 
tracker);
+                }
+                used_memory += tracker->consumption();
+                ++trackerWptr;
             }
-            used_memory += tracker->consumption();
         }
     }
-    refresh_memory(used_memory);
+    // refresh total memory used.
+    _total_mem_used = used_memory;
+    // reserve memory is recorded in the query mem tracker
+    // and _total_mem_used already contains all the current reserve memory.
+    // so after refreshing _total_mem_used, reset 
_wg_refresh_interval_memory_growth.
+    _wg_refresh_interval_memory_growth.store(0.0);
     _mem_used_status->set_value(used_memory);
     return used_memory;
 }
@@ -167,35 +178,38 @@ int64_t WorkloadGroup::memory_used() {
     return make_memory_tracker_snapshots(nullptr);
 }
 
-void WorkloadGroup::refresh_memory(int64_t used_memory) {
-    // refresh total memory used.
-    _total_mem_used = used_memory;
-    // reserve memory is recorded in the query mem tracker
-    // and _total_mem_used already contains all the current reserve memory.
-    // so after refreshing _total_mem_used, reset 
_wg_refresh_interval_memory_growth.
-    _wg_refresh_interval_memory_growth.store(0.0);
-}
+void WorkloadGroup::do_sweep() {
+    // Clear memtracker limiter that is registered during query or load.
+    for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
+        std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
+        for (auto trackerWptr = mem_tracker_group.trackers.begin();
+             trackerWptr != mem_tracker_group.trackers.end();) {
+            auto tracker = trackerWptr->lock();
+            if (tracker == nullptr) {
+                trackerWptr = mem_tracker_group.trackers.erase(trackerWptr);
+            } else {
+                ++trackerWptr;
+            }
+        }
+    }
 
-void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> 
mem_tracker_ptr) {
+    // Clear query context that is registered during query context ctor
     std::unique_lock<std::shared_mutex> wlock(_mutex);
-    auto group_num = mem_tracker_ptr->group_num();
-    std::lock_guard<std::mutex> 
l(_mem_tracker_limiter_pool[group_num].group_lock);
-    mem_tracker_ptr->wg_tracker_limiter_group_it =
-            _mem_tracker_limiter_pool[group_num].trackers.insert(
-                    _mem_tracker_limiter_pool[group_num].trackers.end(), 
mem_tracker_ptr);
+    for (auto iter = _query_ctxs.begin(); iter != _query_ctxs.end();) {
+        if (iter->second.lock() == nullptr) {
+            iter = _query_ctxs.erase(iter);
+        } else {
+            iter++;
+        }
+    }
 }
 
-void 
WorkloadGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> 
mem_tracker_ptr) {
+void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> 
mem_tracker_ptr) {
     std::unique_lock<std::shared_mutex> wlock(_mutex);
     auto group_num = mem_tracker_ptr->group_num();
     std::lock_guard<std::mutex> 
l(_mem_tracker_limiter_pool[group_num].group_lock);
-    if (mem_tracker_ptr->wg_tracker_limiter_group_it !=
-        _mem_tracker_limiter_pool[group_num].trackers.end()) {
-        _mem_tracker_limiter_pool[group_num].trackers.erase(
-                mem_tracker_ptr->wg_tracker_limiter_group_it);
-        mem_tracker_ptr->wg_tracker_limiter_group_it =
-                _mem_tracker_limiter_pool[group_num].trackers.end();
-    }
+    _mem_tracker_limiter_pool[group_num].trackers.insert(
+            _mem_tracker_limiter_pool[group_num].trackers.end(), 
mem_tracker_ptr);
 }
 
 int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* 
profile, bool is_minor_gc) {
@@ -230,14 +244,16 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, 
RuntimeProfile* profile,
     auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption,
                                                   const std::string& label) {
         return fmt::format(
-                "{} cancel top memory overcommit tracker <{}> consumption {}. 
details:{}, Execute "
+                "{} cancel top memory overcommit tracker <{}> consumption {}. 
details:{}, "
+                "Execute "
                 "again after enough memory, details see be.INFO.",
                 cancel_str, label, MemCounter::print_bytes(mem_consumption),
                 GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
     };
     auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const 
std::string& label) {
         return fmt::format(
-                "{} cancel top memory used tracker <{}> consumption {}. 
details:{}, Execute again "
+                "{} cancel top memory used tracker <{}> consumption {}. 
details:{}, Execute "
+                "again "
                 "after enough memory, details see be.INFO.",
                 cancel_str, label, MemCounter::print_bytes(mem_consumption),
                 
GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str());
@@ -249,7 +265,8 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, 
RuntimeProfile* profile,
             _id, _name, _memory_limit, used_memory, need_free_mem);
     Defer defer {[&]() {
         LOG(INFO) << fmt::format(
-                "[MemoryGC] work load group finished gc, id:{} name:{}, memory 
limit: {}, used: "
+                "[MemoryGC] work load group finished gc, id:{} name:{}, memory 
limit: {}, "
+                "used: "
                 "{}, need_free_mem: {}, freed memory: {}.",
                 _id, _name, _memory_limit, used_memory, need_free_mem, 
freed_mem);
     }};
@@ -542,7 +559,8 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
                 _cgroup_cpu_ctl->update_cpu_soft_limit(
                         CgroupCpuCtl::cpu_soft_limit_default_value());
             } else {
-                LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit 
but value is illegal: "
+                LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit 
but value is "
+                             "illegal: "
                           << cpu_hard_limit << ", gid=" << tg_id;
             }
         } else {
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 2fbb4dd3030..933c5afdb4e 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -89,7 +89,8 @@ public:
             std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots);
     // call make_memory_tracker_snapshots, so also refresh total memory used.
     int64_t memory_used();
-    void refresh_memory(int64_t used_memory);
+
+    void do_sweep();
 
     int spill_threshold_low_water_mark() const {
         return _spill_low_watermark.load(std::memory_order_relaxed);
@@ -132,8 +133,6 @@ public:
 
     void add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> 
mem_tracker_ptr);
 
-    void remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> 
mem_tracker_ptr);
-
     // when mem_limit <=0 , it's an invalid value, then current group not 
participating in memory GC
     // because mem_limit is not a required property
     bool is_mem_limit_valid() {
@@ -154,11 +153,6 @@ public:
         return Status::OK();
     }
 
-    void remove_query(TUniqueId query_id) {
-        std::unique_lock<std::shared_mutex> wlock(_mutex);
-        _query_ctxs.erase(query_id);
-    }
-
     void shutdown() {
         std::unique_lock<std::shared_mutex> wlock(_mutex);
         _is_shutdown = true;
@@ -169,11 +163,6 @@ public:
         return _is_shutdown && _query_ctxs.empty();
     }
 
-    int query_num() {
-        std::shared_lock<std::shared_mutex> r_lock(_mutex);
-        return _query_ctxs.size();
-    }
-
     int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile, bool 
is_minor_gc);
 
     void upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env);
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 65a8e3685c8..003f07f1db0 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -136,6 +136,13 @@ void 
WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
               << ", before wg size=" << old_wg_size << ", after wg size=" << 
new_wg_size;
 }
 
+void WorkloadGroupMgr::do_sweep() {
+    std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+    for (auto& [wg_id, wg] : _workload_groups) {
+        wg->do_sweep();
+    }
+}
+
 struct WorkloadGroupMemInfo {
     int64_t total_mem_used = 0;
     std::list<std::shared_ptr<MemTrackerLimiter>> tracker_snapshots =
diff --git a/be/src/runtime/workload_group/workload_group_manager.h 
b/be/src/runtime/workload_group/workload_group_manager.h
index d8547c3383e..f76e98d2606 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -50,6 +50,8 @@ public:
 
     WorkloadGroupPtr get_task_group_by_id(uint64_t tg_id);
 
+    void do_sweep();
+
     void stop();
 
     std::atomic<bool> _enable_cpu_hard_limit = false;
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 06dad46e90c..8dc4d7bb3c2 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1669,17 +1669,13 @@ void 
PInternalService::reset_rpc_channel(google::protobuf::RpcController* contro
 void PInternalService::hand_shake(google::protobuf::RpcController* controller,
                                   const PHandShakeRequest* request, 
PHandShakeResponse* response,
                                   google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.try_offer([request, response, done]() {
-        brpc::ClosureGuard closure_guard(done);
-        if (request->has_hello()) {
-            response->set_hello(request->hello());
-        }
-        response->mutable_status()->set_status_code(0);
-    });
-    if (!ret) {
-        offer_failed(response, done, _light_work_pool);
-        return;
+    // The light pool may be full. Handshake is used to check the connection 
state of brpc.
+    // Should not be interfered by the thread pool logic.
+    brpc::ClosureGuard closure_guard(done);
+    if (request->has_hello()) {
+        response->set_hello(request->hello());
     }
+    response->mutable_status()->set_status_code(0);
 }
 
 constexpr char HttpProtocol[] = "http://";;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to