dataroaring commented on code in PR #12716: URL: https://github.com/apache/doris/pull/12716#discussion_r976130914
########## be/src/runtime/load_channel.h: ########## @@ -174,20 +171,20 @@ inline std::ostream& operator<<(std::ostream& os, const LoadChannel& load_channe } template <typename TabletWriterAddResult> -Status LoadChannel::handle_mem_exceed_limit(bool force, TabletWriterAddResult* response) { - // lock so that only one thread can check mem limit - std::lock_guard<std::mutex> l(_lock); - if (!(force || _mem_tracker->limit_exceeded())) { - return Status::OK(); - } +Status LoadChannel::handle_mem_exceed_limit(TabletWriterAddResult* response) { + bool found = false; + std::shared_ptr<TabletsChannel> channel; + { + // lock so that only one thread can check mem limit + std::lock_guard<std::mutex> l(_lock); - if (!force) { - LOG(INFO) << "reducing memory of " << *this << " because its mem consumption " - << _mem_tracker->consumption() << " has exceeded limit " << _mem_tracker->limit(); + LOG(INFO) << "reducing memory of " << *this + << " ,mem consumption: " << _mem_tracker->consumption(); Review Comment: Put this line of log following _find_largest_consumption_channel with chosen channel info? ########## be/src/runtime/load_channel_mgr.h: ########## @@ -143,36 +151,75 @@ Status LoadChannelMgr::add_batch(const TabletWriterAddRequest& request, template <typename TabletWriterAddResult> Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) { - // lock so that only one thread can check mem limit - std::lock_guard<std::mutex> l(_lock); - if (!_mem_tracker->limit_exceeded()) { - return Status::OK(); - } - - int64_t max_consume = 0; + _pending_if_hard_limit_exceeded(); + // Check limit and pick load channel to reduce memory. std::shared_ptr<LoadChannel> channel; - for (auto& kv : _load_channels) { - if (kv.second->is_high_priority()) { - // do not select high priority channel to reduce memory - // to avoid blocking them. - continue; + { + std::lock_guard<std::mutex> l(_lock); + // Check the soft limit. + if (_mem_tracker->consumption() < _load_process_soft_limit) { + return Status::OK(); } Review Comment: We can put above if outside of _lock? ########## be/src/runtime/load_channel_mgr.h: ########## @@ -76,10 +77,17 @@ class LoadChannelMgr { std::mutex _lock; // load id -> load channel std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels; + std::shared_ptr<LoadChannel> _reduce_memory_channel = nullptr; Cache* _last_success_channel = nullptr; // check the total load channel mem consumption of this Backend std::shared_ptr<MemTrackerLimiter> _mem_tracker; + int64_t _load_process_soft_limit = -1; + + // If hard limit reached, one thread will trigger load channel flush, + // other threads should wait on the condition variable. + bool _hard_limit_reached = false; Review Comment: It seems that _reduce_memory_channel is enough? ########## be/src/runtime/tablets_channel.cpp: ########## @@ -316,6 +333,13 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request return Status::OK(); } +void TabletsChannel::_pending_on_reduce_mem_usage() { Review Comment: The method is not called? ########## be/src/runtime/load_channel_mgr.h: ########## @@ -76,10 +77,17 @@ class LoadChannelMgr { std::mutex _lock; // load id -> load channel std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels; + std::shared_ptr<LoadChannel> _reduce_memory_channel = nullptr; Cache* _last_success_channel = nullptr; // check the total load channel mem consumption of this Backend std::shared_ptr<MemTrackerLimiter> _mem_tracker; + int64_t _load_process_soft_limit = -1; + + // If hard limit reached, one thread will trigger load channel flush, + // other threads should wait on the condition variable. + bool _hard_limit_reached = false; Review Comment: We should just use memtracker as indicator of limit reaching. Here, the variable indicates one thead is flushing memtable due to reached hard limit, so maybe we should name the variable as '_is_flushing_channel'. ########## be/src/runtime/load_channel_mgr.h: ########## @@ -143,36 +151,75 @@ Status LoadChannelMgr::add_batch(const TabletWriterAddRequest& request, template <typename TabletWriterAddResult> Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) { - // lock so that only one thread can check mem limit - std::lock_guard<std::mutex> l(_lock); - if (!_mem_tracker->limit_exceeded()) { - return Status::OK(); - } - - int64_t max_consume = 0; + _pending_if_hard_limit_exceeded(); Review Comment: try_to_wait_flushing is a better name? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org