HappenLee commented on code in PR #37462:
URL: https://github.com/apache/doris/pull/37462#discussion_r1675068836


##########
be/src/pipeline/exec/multi_cast_data_streamer.cpp:
##########
@@ -23,63 +23,97 @@
 
 namespace doris::pipeline {
 
-MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, 
size_t mem_size)
-        : _used_count(used_count), _mem_size(mem_size) {
+MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, int 
un_finish_copy,
+                               size_t mem_size)
+        : _used_count(used_count), _un_finish_copy(un_finish_copy), 
_mem_size(mem_size) {
     _block = 
vectorized::Block::create_unique(block->get_columns_with_type_and_name());
     block->clear();
 }
 
 Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* 
block, bool* eos) {
-    std::lock_guard l(_mutex);
-    auto& pos_to_pull = _sender_pos_to_read[sender_idx];
-    if (pos_to_pull != _multi_cast_blocks.end()) {
-        if (pos_to_pull->_used_count == 1) {
-            DCHECK(pos_to_pull == _multi_cast_blocks.begin());
-            pos_to_pull->_block->swap(*block);
-
-            _cumulative_mem_size -= pos_to_pull->_mem_size;
-            pos_to_pull++;
-            _multi_cast_blocks.pop_front();
-        } else {
-            pos_to_pull->_block->create_same_struct_block(0)->swap(*block);
-            
RETURN_IF_ERROR(vectorized::MutableBlock(block).merge(*pos_to_pull->_block));
-            pos_to_pull->_used_count--;
-            pos_to_pull++;
+    int* un_finish_copy = nullptr;
+    int use_count = 0;
+    {
+        std::lock_guard l(_mutex);
+        auto& pos_to_pull = _sender_pos_to_read[sender_idx];
+        const auto end = _multi_cast_blocks.end();
+        DCHECK(pos_to_pull != end);
+
+        *block = *pos_to_pull->_block;
+
+        _cumulative_mem_size -= pos_to_pull->_mem_size;
+
+        pos_to_pull->_used_count--;
+        use_count = pos_to_pull->_used_count;
+        un_finish_copy = &pos_to_pull->_un_finish_copy;
+
+        pos_to_pull++;
+
+        if (pos_to_pull == end) {
+            _block_reading(sender_idx);
         }
+
+        *eos = _eos and pos_to_pull == end;
     }
-    *eos = _eos and pos_to_pull == _multi_cast_blocks.end();
-    if (pos_to_pull == _multi_cast_blocks.end()) {
-        _block_reading(sender_idx);
+
+    if (use_count == 0) {
+        // will clear _multi_cast_blocks
+        _wait_copy_block(block, *un_finish_copy);
+    } else {
+        _copy_block(block, *un_finish_copy);
     }
+
     return Status::OK();
 }
 
+void MultiCastDataStreamer::_copy_block(vectorized::Block* block, int& 
un_finish_copy) {
+    const auto rows = block->rows();
+    for (int i = 0; i < block->columns(); ++i) {
+        block->get_by_position(i).column = 
block->get_by_position(i).column->clone_resized(rows);
+    }
+
+    std::unique_lock l(_mutex);
+    un_finish_copy--;
+    if (un_finish_copy == 0) {
+        l.unlock();
+        _cv.notify_one();
+    }
+}
+
+void MultiCastDataStreamer::_wait_copy_block(vectorized::Block* block, int& 
un_finish_copy) {
+    std::unique_lock l(_mutex);
+    _cv.wait(l, [&]() { return un_finish_copy == 0; });

Review Comment:
   danger will notify different queue last



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to