yiguolei commented on code in PR #15624: URL: https://github.com/apache/doris/pull/15624#discussion_r1062206515
########## be/src/vec/common/sort/sorter.cpp: ########## @@ -67,13 +173,71 @@ Status MergeSorterState::merge_sort_read(doris::RuntimeState* state, } if (!mem_reuse) { - Block merge_block = sorted_blocks[0].clone_with_columns(std::move(merged_columns)); + Block merge_block = sorted_blocks_[0].clone_with_columns(std::move(merged_columns)); merge_block.swap(*block); } return Status::OK(); } +int MergeSorterState::_calc_spill_blocks_to_merge() const { + return config::external_sort_bytes_threshold / BLOCK_SPILL_BATCH_BYTES; +} + +// merge all the intermediate spilled blocks +Status MergeSorterState::_merge_spilled_blocks(const SortDescription& sort_description) { + int num_of_blocks_to_merge = _calc_spill_blocks_to_merge(); + while (true) { + // pick some spilled blocks to merge, and spill the merged result + // to disk, until all splled blocks can be merged in a run. + RETURN_IF_ERROR(_create_intermediate_merger(num_of_blocks_to_merge, sort_description)); + if (spilled_sorted_block_streams_.empty()) { + LOG(WARNING) << "spill to disk: _merge_spilled_blocks, can merger all spilled block in " + "memory"; + break; + } + + bool eos = false; + + BlockSpillWriterUPtr spill_block_wirter; + RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer( + spill_block_batch_size_, spill_block_wirter)); + + while (!eos) { + merge_sorted_block_.clear_column_data(); + RETURN_IF_ERROR(merger_->get_next(&merge_sorted_block_, &eos)); + RETURN_IF_ERROR(spill_block_wirter->write(merge_sorted_block_)); + } + spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id()); + spill_block_wirter->close(); + } + return Status::OK(); +} + +Status MergeSorterState::_create_intermediate_merger(int num_blocks, + const SortDescription& sort_description) { + _reset_block_spill_readers(); + + std::vector<BlockSupplier> child_block_suppliers; + merger_.reset(new VSortedRunMerger(sort_description, spill_block_batch_size_, limit_, offset_, + profile_)); + + for (int i = 0; i < num_blocks && !spilled_sorted_block_streams_.empty(); ++i) { + auto stream_id = spilled_sorted_block_streams_.front(); + BlockSpillReaderUPtr spilled_block_reader; Review Comment: the reader is not closed? maybe we should call close automatically in deconstructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org