liaoxin01 commented on code in PR #14214: URL: https://github.com/apache/doris/pull/14214#discussion_r1020707841
########## be/src/runtime/load_channel_mgr.h: ########## @@ -171,77 +178,115 @@ Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response) MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { return Status::OK(); } - // Pick load channel to reduce memory. - std::shared_ptr<LoadChannel> channel; // Indicate whether current thread is reducing mem on hard limit. bool reducing_mem_on_hard_limit = false; + std::vector<std::shared_ptr<LoadChannel>> channels_to_reduce_mem; { std::unique_lock<std::mutex> l(_lock); while (_should_wait_flush) { LOG(INFO) << "Reached the load hard limit " << _load_hard_mem_limit << ", waiting for flush"; _wait_flush_cond.wait(l); } + bool hard_limit_reached = _mem_tracker->consumption() >= _load_hard_mem_limit || + MemInfo::proc_mem_no_allocator_cache() >= process_mem_limit; // Some other thread is flushing data, and not reached hard limit now, // we don't need to handle mem limit in current thread. - if (_reduce_memory_channel != nullptr && - _mem_tracker->consumption() < _load_hard_mem_limit && - MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { + if (_soft_reduce_mem_in_progress && !hard_limit_reached) { return Status::OK(); } - // We need to pick a LoadChannel to reduce memory usage. - // If `_reduce_memory_channel` is not null, it means the hard limit is - // exceed now, we still need to pick a load channel again. Because - // `_reduce_memory_channel` might not be the largest consumer now. - int64_t max_consume = 0; + // Pick LoadChannels to reduce memory usage, if some other thread is reducing memory + // due to soft limit, and we reached hard limit now, current thread may pick some + // duplicate channels and trigger duplicate reducing memory process. + // But the load channel's reduce memory process is thread safe, only 1 thread can + // reduce memory at the same time, other threads will wait on a condition variable, + // after the reduce-memory work finished, all threads will return. + using ChannelMemPair = std::pair<std::shared_ptr<LoadChannel>, int64_t>; + std::vector<ChannelMemPair> candidate_channels; + int64_t total_consume = 0; for (auto& kv : _load_channels) { if (kv.second->is_high_priority()) { // do not select high priority channel to reduce memory // to avoid blocking them. continue; } - if (kv.second->mem_consumption() > max_consume) { - max_consume = kv.second->mem_consumption(); - channel = kv.second; - } + int64_t mem = kv.second->mem_consumption(); + // save the mem consumption, since the calculation might be expensive. + candidate_channels.push_back(std::make_pair(kv.second, mem)); + total_consume += mem; } - if (max_consume == 0) { + + if (candidate_channels.empty()) { // should not happen, add log to observe - LOG(WARNING) << "failed to find suitable load channel when total load mem limit exceed"; + LOG(WARNING) << "All load channels are high priority, failed to find suitable" + << "channels to reduce memory when total load mem limit exceed"; return Status::OK(); } - DCHECK(channel.get() != nullptr); - _reduce_memory_channel = channel; + + // sort all load channels, try to find the largest one. + std::sort(candidate_channels.begin(), candidate_channels.end(), + [](const ChannelMemPair& lhs, const ChannelMemPair& rhs) { + return lhs.second > rhs.second; + }); + + int64_t mem_consumption_in_picked_channel = 0; + auto largest_channel = *candidate_channels.begin(); + // If some load-channel is big enough, we can reduce it only, try our best to avoid + // reducing small load channels. + if (_load_channel_min_mem_to_reduce > 0 && + largest_channel.second > _load_channel_min_mem_to_reduce) { + // Pick 1 load channel to reduce memory. + channels_to_reduce_mem.push_back(largest_channel.first); + mem_consumption_in_picked_channel = largest_channel.second; + } else { + // Pick multiple channels to reduce memory. + int64_t mem_to_flushed = total_consume / 3; + for (auto ch : candidate_channels) { + channels_to_reduce_mem.push_back(ch.first); + mem_consumption_in_picked_channel += ch.second; + if (mem_consumption_in_picked_channel >= mem_to_flushed) { + break; + } + } + } std::ostringstream oss; if (MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) { - oss << "reducing memory of " << *channel << " because total load mem consumption " + oss << "reducing memory of " << channels_to_reduce_mem.size() + << " load channels (total mem consumption: " << mem_consumption_in_picked_channel + << " bytes), because total load mem consumption " << PrettyPrinter::print(_mem_tracker->consumption(), TUnit::BYTES) << " has exceeded"; if (_mem_tracker->consumption() > _load_hard_mem_limit) { _should_wait_flush = true; reducing_mem_on_hard_limit = true; oss << " hard limit: " << PrettyPrinter::print(_load_hard_mem_limit, TUnit::BYTES); } else { + _soft_reduce_mem_in_progress = true; oss << " soft limit: " << PrettyPrinter::print(_load_soft_mem_limit, TUnit::BYTES); } } else { _should_wait_flush = true; reducing_mem_on_hard_limit = true; - oss << "reducing memory of " << *channel << " because process memory used " - << PerfCounters::get_vm_rss_str() << " has exceeded limit " + oss << "reducing memory of " << channels_to_reduce_mem.size() + << " load channels (total mem consumption: " << mem_consumption_in_picked_channel + << " bytes), because " << PerfCounters::get_vm_rss_str() << " has exceeded limit " << PrettyPrinter::print(process_mem_limit, TUnit::BYTES) << " , tc/jemalloc allocator cache " << MemInfo::allocator_cache_mem_str(); } LOG(INFO) << oss.str(); } - // No matter soft limit or hard limit reached, only 1 thread will wait here, - // if hard limit reached, other threads will pend at the beginning of this - // method. - Status st = channel->handle_mem_exceed_limit(response); - LOG(INFO) << "reduce memory of " << *channel << " finished"; + Status st = Status::OK(); + for (auto ch : channels_to_reduce_mem) { + uint64_t begin = GetCurrentTimeMicros(); + int64_t mem_usage = ch->mem_consumption(); + st = ch->handle_mem_exceed_limit(response); Review Comment: Only the last `st` is returned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org