HappenLee commented on code in PR #37462: URL: https://github.com/apache/doris/pull/37462#discussion_r1675068836
########## be/src/pipeline/exec/multi_cast_data_streamer.cpp: ########## @@ -23,63 +23,97 @@ namespace doris::pipeline { -MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size) - : _used_count(used_count), _mem_size(mem_size) { +MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, int un_finish_copy, + size_t mem_size) + : _used_count(used_count), _un_finish_copy(un_finish_copy), _mem_size(mem_size) { _block = vectorized::Block::create_unique(block->get_columns_with_type_and_name()); block->clear(); } Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block, bool* eos) { - std::lock_guard l(_mutex); - auto& pos_to_pull = _sender_pos_to_read[sender_idx]; - if (pos_to_pull != _multi_cast_blocks.end()) { - if (pos_to_pull->_used_count == 1) { - DCHECK(pos_to_pull == _multi_cast_blocks.begin()); - pos_to_pull->_block->swap(*block); - - _cumulative_mem_size -= pos_to_pull->_mem_size; - pos_to_pull++; - _multi_cast_blocks.pop_front(); - } else { - pos_to_pull->_block->create_same_struct_block(0)->swap(*block); - RETURN_IF_ERROR(vectorized::MutableBlock(block).merge(*pos_to_pull->_block)); - pos_to_pull->_used_count--; - pos_to_pull++; + int* un_finish_copy = nullptr; + int use_count = 0; + { + std::lock_guard l(_mutex); + auto& pos_to_pull = _sender_pos_to_read[sender_idx]; + const auto end = _multi_cast_blocks.end(); + DCHECK(pos_to_pull != end); + + *block = *pos_to_pull->_block; + + _cumulative_mem_size -= pos_to_pull->_mem_size; + + pos_to_pull->_used_count--; + use_count = pos_to_pull->_used_count; + un_finish_copy = &pos_to_pull->_un_finish_copy; + + pos_to_pull++; + + if (pos_to_pull == end) { + _block_reading(sender_idx); } + + *eos = _eos and pos_to_pull == end; } - *eos = _eos and pos_to_pull == _multi_cast_blocks.end(); - if (pos_to_pull == _multi_cast_blocks.end()) { - _block_reading(sender_idx); + + if (use_count == 0) { + // will clear _multi_cast_blocks + _wait_copy_block(block, *un_finish_copy); + } else { + _copy_block(block, *un_finish_copy); } + return Status::OK(); } +void MultiCastDataStreamer::_copy_block(vectorized::Block* block, int& un_finish_copy) { + const auto rows = block->rows(); + for (int i = 0; i < block->columns(); ++i) { + block->get_by_position(i).column = block->get_by_position(i).column->clone_resized(rows); + } + + std::unique_lock l(_mutex); + un_finish_copy--; + if (un_finish_copy == 0) { + l.unlock(); + _cv.notify_one(); + } +} + +void MultiCastDataStreamer::_wait_copy_block(vectorized::Block* block, int& un_finish_copy) { + std::unique_lock l(_mutex); + _cv.wait(l, [&]() { return un_finish_copy == 0; }); Review Comment: danger will notify different queue last -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org