dataroaring commented on code in PR #19099: URL: https://github.com/apache/doris/pull/19099#discussion_r1194769944
########## be/src/olap/memtable.cpp: ########## @@ -267,102 +230,152 @@ void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_ new_row->_row_pos, nullptr); } } -template <bool is_final> -void MemTable::_collect_vskiplist_results() { - if (_keys_type == KeysType::DUP_KEYS) { - if (_schema->num_key_columns() > 0) { - _collect_dup_table_with_keys(); - } else { - // skip sort if the table is dup table without keys - _collect_dup_table_without_keys(); - } - } else { - VecTable::Iterator it(_vec_skip_list.get()); - vectorized::Block in_block = _input_mutable_block.to_block(); - size_t idx = 0; - for (it.SeekToFirst(); it.Valid(); it.Next()) { - auto& block_data = in_block.get_columns_with_type_and_name(); - // move key columns - for (size_t i = 0; i < _schema->num_key_columns(); ++i) { - _output_mutable_block.get_column_by_position(i)->insert_from( - *block_data[i].column.get(), it.key()->_row_pos); - } - // get value columns from agg_places - for (size_t i = _schema->num_key_columns(); i < _num_columns; ++i) { - auto function = _agg_functions[i]; - auto agg_place = it.key()->agg_places(i); - auto col_ptr = _output_mutable_block.get_column_by_position(i).get(); - function->insert_result_into(agg_place, *col_ptr); - if constexpr (is_final) { - function->destroy(agg_place); - } else { - function->reset(agg_place); - function->add(agg_place, - const_cast<const doris::vectorized::IColumn**>(&col_ptr), idx, - nullptr); - } - } - if constexpr (!is_final) { - // re-index the row_pos in VSkipList - it.key()->_row_pos = idx; - idx++; - } - } - if constexpr (!is_final) { - // if is not final, we collect the agg results to input_block and then continue to insert - size_t shrunked_after_agg = _output_mutable_block.allocated_bytes(); - // flush will not run here, so will not duplicate `_flush_mem_tracker` - _insert_mem_tracker->consume(shrunked_after_agg - _mem_usage); - _mem_usage = shrunked_after_agg; - _input_mutable_block.swap(_output_mutable_block); - //TODO(weixang):opt here. - std::unique_ptr<vectorized::Block> empty_input_block = - in_block.create_same_struct_block(0); - _output_mutable_block = - vectorized::MutableBlock::build_mutable_block(empty_input_block.get()); - _output_mutable_block.clear_column_data(); - } - } - - if (is_final) { - _vec_skip_list.reset(); +void MemTable::prepare_block_for_flush(vectorized::Block& in_block) { + std::vector<int> row_pos_vec; + DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); + row_pos_vec.reserve(in_block.rows()); + for (int i = 0; i < _row_in_blocks.size(); i++) { + row_pos_vec.emplace_back(_row_in_blocks[i]->_row_pos); } + _output_mutable_block.add_rows(&in_block, row_pos_vec.data(), + row_pos_vec.data() + in_block.rows()); } - -void MemTable::_collect_dup_table_with_keys() { +int MemTable::_sort() { vectorized::Block in_block = _input_mutable_block.to_block(); vectorized::MutableBlock mutable_block = vectorized::MutableBlock::build_mutable_block(&in_block); _vec_row_comparator->set_block(&mutable_block); - std::sort(_row_in_blocks.begin(), _row_in_blocks.end(), - [this](const RowInBlock* l, const RowInBlock* r) -> bool { + auto new_row_it = std::next(_row_in_blocks.begin(), _last_sorted_pos); + size_t same_keys_num = 0; + // sort new rows + std::sort(new_row_it, _row_in_blocks.end(), + [this, &same_keys_num](const RowInBlock* l, const RowInBlock* r) -> bool { auto value = (*(this->_vec_row_comparator))(l, r); if (value == 0) { + same_keys_num++; return l->_row_pos > r->_row_pos; } else { return value < 0; } }); - std::vector<int> row_pos_vec; - DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); - row_pos_vec.reserve(in_block.rows()); - for (int i = 0; i < _row_in_blocks.size(); i++) { - row_pos_vec.emplace_back(_row_in_blocks[i]->_row_pos); + // merge new rows and old rows + std::inplace_merge(_row_in_blocks.begin(), new_row_it, _row_in_blocks.end(), + [this, &same_keys_num](const RowInBlock* l, const RowInBlock* r) -> bool { + auto value = (*(this->_vec_row_comparator))(l, r); + if (value == 0) { + same_keys_num++; + return l->_row_pos > r->_row_pos; + } else { + return value < 0; + } + }); + _last_sorted_pos = _row_in_blocks.size(); + return same_keys_num; +} + +template <bool is_final> +void MemTable::_aggregate_one_row(RowInBlock* row, + const vectorized::ColumnsWithTypeAndName& block_data, Review Comment: finalize_one_row is a better name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org