yiguolei commented on code in PR #15624:
URL: https://github.com/apache/doris/pull/15624#discussion_r1062206515


##########
be/src/vec/common/sort/sorter.cpp:
##########
@@ -67,13 +173,71 @@ Status 
MergeSorterState::merge_sort_read(doris::RuntimeState* state,
     }
 
     if (!mem_reuse) {
-        Block merge_block = 
sorted_blocks[0].clone_with_columns(std::move(merged_columns));
+        Block merge_block = 
sorted_blocks_[0].clone_with_columns(std::move(merged_columns));
         merge_block.swap(*block);
     }
 
     return Status::OK();
 }
 
+int MergeSorterState::_calc_spill_blocks_to_merge() const {
+    return config::external_sort_bytes_threshold / BLOCK_SPILL_BATCH_BYTES;
+}
+
+// merge all the intermediate spilled blocks
+Status MergeSorterState::_merge_spilled_blocks(const SortDescription& 
sort_description) {
+    int num_of_blocks_to_merge = _calc_spill_blocks_to_merge();
+    while (true) {
+        // pick some spilled blocks to merge, and spill the merged result
+        // to disk, until all splled blocks can be merged in a run.
+        RETURN_IF_ERROR(_create_intermediate_merger(num_of_blocks_to_merge, 
sort_description));
+        if (spilled_sorted_block_streams_.empty()) {
+            LOG(WARNING) << "spill to disk: _merge_spilled_blocks, can merger 
all spilled block in "
+                            "memory";
+            break;
+        }
+
+        bool eos = false;
+
+        BlockSpillWriterUPtr spill_block_wirter;
+        RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer(
+                spill_block_batch_size_, spill_block_wirter));
+
+        while (!eos) {
+            merge_sorted_block_.clear_column_data();
+            RETURN_IF_ERROR(merger_->get_next(&merge_sorted_block_, &eos));
+            RETURN_IF_ERROR(spill_block_wirter->write(merge_sorted_block_));
+        }
+        
spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id());
+        spill_block_wirter->close();
+    }
+    return Status::OK();
+}
+
+Status MergeSorterState::_create_intermediate_merger(int num_blocks,
+                                                     const SortDescription& 
sort_description) {
+    _reset_block_spill_readers();
+
+    std::vector<BlockSupplier> child_block_suppliers;
+    merger_.reset(new VSortedRunMerger(sort_description, 
spill_block_batch_size_, limit_, offset_,
+                                       profile_));
+
+    for (int i = 0; i < num_blocks && !spilled_sorted_block_streams_.empty(); 
++i) {
+        auto stream_id = spilled_sorted_block_streams_.front();
+        BlockSpillReaderUPtr spilled_block_reader;

Review Comment:
   the reader is not closed? maybe we should call close automatically in 
deconstructor both reader and writer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to