dataroaring commented on code in PR #12716:
URL: https://github.com/apache/doris/pull/12716#discussion_r976130914


##########
be/src/runtime/load_channel.h:
##########
@@ -174,20 +171,20 @@ inline std::ostream& operator<<(std::ostream& os, const 
LoadChannel& load_channe
 }
 
 template <typename TabletWriterAddResult>
-Status LoadChannel::handle_mem_exceed_limit(bool force, TabletWriterAddResult* 
response) {
-    // lock so that only one thread can check mem limit
-    std::lock_guard<std::mutex> l(_lock);
-    if (!(force || _mem_tracker->limit_exceeded())) {
-        return Status::OK();
-    }
+Status LoadChannel::handle_mem_exceed_limit(TabletWriterAddResult* response) {
+    bool found = false;
+    std::shared_ptr<TabletsChannel> channel;
+    {
+        // lock so that only one thread can check mem limit
+        std::lock_guard<std::mutex> l(_lock);
 
-    if (!force) {
-        LOG(INFO) << "reducing memory of " << *this << " because its mem 
consumption "
-                  << _mem_tracker->consumption() << " has exceeded limit " << 
_mem_tracker->limit();
+        LOG(INFO) << "reducing memory of " << *this
+                  << " ,mem consumption: " << _mem_tracker->consumption();

Review Comment:
   Put this line of log following _find_largest_consumption_channel with chosen 
channel info?



##########
be/src/runtime/load_channel_mgr.h:
##########
@@ -143,36 +151,75 @@ Status LoadChannelMgr::add_batch(const 
TabletWriterAddRequest& request,
 
 template <typename TabletWriterAddResult>
 Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* 
response) {
-    // lock so that only one thread can check mem limit
-    std::lock_guard<std::mutex> l(_lock);
-    if (!_mem_tracker->limit_exceeded()) {
-        return Status::OK();
-    }
-
-    int64_t max_consume = 0;
+    _pending_if_hard_limit_exceeded();
+    // Check limit and pick load channel to reduce memory.
     std::shared_ptr<LoadChannel> channel;
-    for (auto& kv : _load_channels) {
-        if (kv.second->is_high_priority()) {
-            // do not select high priority channel to reduce memory
-            // to avoid blocking them.
-            continue;
+    {
+        std::lock_guard<std::mutex> l(_lock);
+        // Check the soft limit.
+        if (_mem_tracker->consumption() < _load_process_soft_limit) {
+            return Status::OK();
         }

Review Comment:
   We can put above if outside of _lock?



##########
be/src/runtime/load_channel_mgr.h:
##########
@@ -76,10 +77,17 @@ class LoadChannelMgr {
     std::mutex _lock;
     // load id -> load channel
     std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels;
+    std::shared_ptr<LoadChannel> _reduce_memory_channel = nullptr;
     Cache* _last_success_channel = nullptr;
 
     // check the total load channel mem consumption of this Backend
     std::shared_ptr<MemTrackerLimiter> _mem_tracker;
+    int64_t _load_process_soft_limit = -1;
+
+    // If hard limit reached, one thread will trigger load channel flush,
+    // other threads should wait on the condition variable.
+    bool _hard_limit_reached = false;

Review Comment:
   It seems that _reduce_memory_channel is enough?
   



##########
be/src/runtime/tablets_channel.cpp:
##########
@@ -316,6 +333,13 @@ Status TabletsChannel::_open_all_writers(const 
PTabletWriterOpenRequest& request
     return Status::OK();
 }
 
+void TabletsChannel::_pending_on_reduce_mem_usage() {

Review Comment:
   The method is not called?



##########
be/src/runtime/load_channel_mgr.h:
##########
@@ -76,10 +77,17 @@ class LoadChannelMgr {
     std::mutex _lock;
     // load id -> load channel
     std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels;
+    std::shared_ptr<LoadChannel> _reduce_memory_channel = nullptr;
     Cache* _last_success_channel = nullptr;
 
     // check the total load channel mem consumption of this Backend
     std::shared_ptr<MemTrackerLimiter> _mem_tracker;
+    int64_t _load_process_soft_limit = -1;
+
+    // If hard limit reached, one thread will trigger load channel flush,
+    // other threads should wait on the condition variable.
+    bool _hard_limit_reached = false;

Review Comment:
   We should just use memtracker as indicator of limit reaching.  Here, the 
variable indicates one thead is flushing memtable due to reached hard limit, so 
maybe we should name the variable as '_is_flushing_channel'.



##########
be/src/runtime/load_channel_mgr.h:
##########
@@ -143,36 +151,75 @@ Status LoadChannelMgr::add_batch(const 
TabletWriterAddRequest& request,
 
 template <typename TabletWriterAddResult>
 Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* 
response) {
-    // lock so that only one thread can check mem limit
-    std::lock_guard<std::mutex> l(_lock);
-    if (!_mem_tracker->limit_exceeded()) {
-        return Status::OK();
-    }
-
-    int64_t max_consume = 0;
+    _pending_if_hard_limit_exceeded();

Review Comment:
   try_to_wait_flushing is a better name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to