yiguolei commented on code in PR #15624: URL: https://github.com/apache/doris/pull/15624#discussion_r1062199064
########## be/src/vec/common/sort/sorter.cpp: ########## @@ -17,48 +17,154 @@ #include "vec/common/sort/sorter.h" +#include "runtime/block_spill_manager.h" #include "runtime/thread_context.h" namespace doris::vectorized { -void MergeSorterState::build_merge_tree(SortDescription& sort_description) { - for (const auto& block : sorted_blocks) { - cursors.emplace_back(block, sort_description); +// When doing spillable sorting, each sorted block is spilled into a single file. +// +// In order to decrease memory pressure when merging +// multiple spilled blocks into one bigger sorted block, only part +// of each spilled blocks are read back into memory at a time. +// +// Currently the spilled blocks are splitted into small sub blocks, +// each sub block is serialized in PBlock format and appended +// to the spill file. +// +// This number specifies the maximum size of sub blocks +static constexpr int BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024; + +Status MergeSorterState::add_sorted_block(Block& block) { + auto rows = block.rows(); + if (0 == rows) { + return Status::OK(); + } + if (0 == avg_row_bytes_) { + avg_row_bytes_ = block.bytes() / rows; + spill_block_batch_size_ = (BLOCK_SPILL_BATCH_BYTES + avg_row_bytes_ - 1) / avg_row_bytes_; } - if (sorted_blocks.size() > 1) { - for (auto& cursor : cursors) priority_queue.push(MergeSortCursor(&cursor)); + auto bytes_used = data_size(); + if (is_spilled_ || + (config::external_sort_bytes_threshold > 0 && + (bytes_used + block.allocated_bytes()) >= + (config::external_sort_bytes_threshold - BLOCK_SPILL_BATCH_BYTES))) { + is_spilled_ = true; + BlockSpillWriterUPtr spill_block_wirter; + RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer( + spill_block_batch_size_, spill_block_wirter)); + + RETURN_IF_ERROR(spill_block_wirter->write(block)); + spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id()); + + spill_block_wirter->close(); + + if (init_merge_sorted_block_) { + init_merge_sorted_block_ = false; + merge_sorted_block_ = block.clone_empty(); + } + } else { + sorted_blocks_.emplace_back(std::move(block)); } + num_rows_ += rows; + return Status::OK(); +} + +Status MergeSorterState::_build_merge_tree_not_spilled(const SortDescription& sort_description) { + for (const auto& block : sorted_blocks_) { + cursors_.emplace_back(block, sort_description); + } + + if (sorted_blocks_.size() > 1) { + for (auto& cursor : cursors_) priority_queue_.push(MergeSortCursor(&cursor)); + } + return Status::OK(); +} + +Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) { + _build_merge_tree_not_spilled(sort_description); + + if (spilled_sorted_block_streams_.size() > 0) { + if (sorted_blocks_.size() > 0) { + BlockSpillWriterUPtr spill_block_wirter; + RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer( + spill_block_batch_size_, spill_block_wirter)); + + LOG(WARNING) << "spill to disk: build merge tree, in memory sorted blocks: " + << sorted_blocks_.size(); + if (sorted_blocks_.size() == 1) { + RETURN_IF_ERROR(spill_block_wirter->write(sorted_blocks_[0])); + } else { + bool eos = false; + + // merge blocks in memory and write merge result to disk + while (!eos) { + merge_sorted_block_.clear_column_data(); + _merge_sort_read_not_spilled(spill_block_batch_size_, &merge_sorted_block_, + &eos); + RETURN_IF_ERROR(spill_block_wirter->write(merge_sorted_block_)); + } + } + spilled_sorted_block_streams_.emplace_back(spill_block_wirter->get_id()); + spill_block_wirter->close(); + } + RETURN_IF_ERROR(_merge_spilled_blocks(sort_description)); + } + return Status::OK(); } Status MergeSorterState::merge_sort_read(doris::RuntimeState* state, doris::vectorized::Block* block, bool* eos) { - size_t num_columns = sorted_blocks[0].columns(); + Status status = Status::OK(); + if (is_spilled_) { + status = merger_->get_next(block, eos); + LOG(WARNING) << "spill to disk: merger_->get_next, eos: " << *eos; + } else { + LOG(WARNING) << "spill to disk: _merge_sort_read_not_spilled, eos: " << *eos; + if (sorted_blocks_.empty()) { + *eos = true; + } else if (sorted_blocks_.size() == 1) { + if (offset_ != 0) { + sorted_blocks_[0].skip_num_rows(offset_); + } + block->swap(sorted_blocks_[0]); + *eos = true; + } else { + status = _merge_sort_read_not_spilled(state->batch_size(), block, eos); + } + } + return status; +} + +Status MergeSorterState::_merge_sort_read_not_spilled(int batch_size, + doris::vectorized::Block* block, bool* eos) { Review Comment: Does this method could return not OK status? And I find you did not check the status. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org