HappenLee commented on code in PR #35428: URL: https://github.com/apache/doris/pull/35428#discussion_r1616529805
########## be/src/vec/runtime/vdata_stream_recvr.cpp: ########## @@ -477,23 +478,38 @@ void VDataStreamRecvr::cancel_stream(Status exec_status) { void VDataStreamRecvr::SenderQueue::add_blocks_memory_usage(int64_t size) { DCHECK(size >= 0); _recvr->_mem_tracker->consume(size); - if (_local_channel_dependency && _recvr->exceeds_limit(0)) { + _queue_mem_tracker->consume(size); + if (_local_channel_dependency && exceeds_limit()) { _local_channel_dependency->block(); } } void VDataStreamRecvr::SenderQueue::sub_blocks_memory_usage(int64_t size) { DCHECK(size >= 0); _recvr->_mem_tracker->release(size); - if (_local_channel_dependency && (!_recvr->exceeds_limit(0))) { + _queue_mem_tracker->release(size); + if (_local_channel_dependency && (!exceeds_limit())) { _local_channel_dependency->set_ready(); } } +bool VDataStreamRecvr::SenderQueue::exceeds_limit() { + const size_t queue_byte_size = _queue_mem_tracker->consumption(); + return _recvr->queue_exceeds_limit(queue_byte_size); +} + bool VDataStreamRecvr::exceeds_limit(size_t block_byte_size) { return _mem_tracker->consumption() + block_byte_size > config::exchg_node_buffer_size_bytes; } +bool VDataStreamRecvr::queue_exceeds_limit(size_t queue_byte_size) { + const size_t queue_limit = config::exchg_node_buffer_size_bytes / _sender_queues.size(); Review Comment: do not do the cal each time,it‘s cost -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org