dataroaring commented on code in PR #19099: URL: https://github.com/apache/doris/pull/19099#discussion_r1191833319
########## be/src/olap/memtable.cpp: ########## @@ -267,79 +218,137 @@ void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_ new_row->_row_pos, nullptr); } } +void MemTable::_add_rows_from_block(vectorized::Block& in_block) { + std::vector<int> row_pos_vec; + DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); + row_pos_vec.reserve(in_block.rows()); + for (int i = 0; i < _row_in_blocks.size(); i++) { + row_pos_vec.emplace_back(_row_in_blocks[i]->_row_pos); + } + _output_mutable_block.add_rows(&in_block, row_pos_vec.data(), + row_pos_vec.data() + in_block.rows()); +} +int MemTable::_sort_row_in_blocks() { + vectorized::Block in_block = _input_mutable_block.to_block(); + vectorized::MutableBlock mutable_block = + vectorized::MutableBlock::build_mutable_block(&in_block); + _vec_row_comparator->set_block(&mutable_block); + auto new_row_it = std::next(_row_in_blocks.begin(), _last_sorted_pos); + size_t same_keys_num = 0; + // sort new rows + std::sort(new_row_it, _row_in_blocks.end(), + [this, &same_keys_num](const RowInBlock* l, const RowInBlock* r) -> bool { + auto value = (*(this->_vec_row_comparator))(l, r); + if (value == 0) { + same_keys_num++; + return l->_row_pos > r->_row_pos; + } else { + return value < 0; + } + }); + // merge new rows and old rows + std::inplace_merge(_row_in_blocks.begin(), new_row_it, _row_in_blocks.end(), + [this, &same_keys_num](const RowInBlock* l, const RowInBlock* r) -> bool { + auto value = (*(this->_vec_row_comparator))(l, r); + if (value == 0) { + same_keys_num++; + return l->_row_pos > r->_row_pos; + } else { + return value < 0; + } + }); + _last_sorted_pos = _row_in_blocks.size(); + return same_keys_num; +} + template <bool is_final> -void MemTable::_collect_vskiplist_results() { - VecTable::Iterator it(_vec_skip_list.get()); +void MemTable::_merge_row_in_blocks() { vectorized::Block in_block = _input_mutable_block.to_block(); - if (_keys_type == KeysType::DUP_KEYS) { - vectorized::MutableBlock mutable_block = - vectorized::MutableBlock::build_mutable_block(&in_block); - _vec_row_comparator->set_block(&mutable_block); - std::sort(_row_in_blocks.begin(), _row_in_blocks.end(), - [this](const RowInBlock* l, const RowInBlock* r) -> bool { - auto value = (*(this->_vec_row_comparator))(l, r); - if (value == 0) { - return l->_row_pos > r->_row_pos; - } else { - return value < 0; - } - }); - std::vector<int> row_pos_vec; - DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); - row_pos_vec.reserve(in_block.rows()); - for (int i = 0; i < _row_in_blocks.size(); i++) { - row_pos_vec.emplace_back(_row_in_blocks[i]->_row_pos); - } - _output_mutable_block.add_rows(&in_block, row_pos_vec.data(), - row_pos_vec.data() + in_block.rows()); - } else { - size_t idx = 0; - for (it.SeekToFirst(); it.Valid(); it.Next()) { - auto& block_data = in_block.get_columns_with_type_and_name(); - // move key columns - for (size_t i = 0; i < _schema->num_key_columns(); ++i) { - _output_mutable_block.get_column_by_position(i)->insert_from( - *block_data[i].column.get(), it.key()->_row_pos); + vectorized::MutableBlock mutable_block = + vectorized::MutableBlock::build_mutable_block(&in_block); + _vec_row_comparator->set_block(&mutable_block); + std::vector<RowInBlock*> temp_row_in_blocks; + std::vector<RowInBlock*> agg_row_in_temp; + RowInBlock* prev_row; + temp_row_in_blocks.reserve(_last_sorted_pos); + agg_row_in_temp.reserve(_last_sorted_pos); + bool need_init_agg = true; + //only init agg if needed + for (int i = 0; i < _row_in_blocks.size(); i++) { + if (!temp_row_in_blocks.empty() && + (*_vec_row_comparator)(prev_row, _row_in_blocks[i]) == 0) { + if (need_init_agg) { + prev_row->init_agg_places( + _arena->aligned_alloc(_total_size_of_aggregate_states, 16), + _offsets_of_aggregate_states.data()); + for (auto cid = _schema->num_key_columns(); cid < _schema->num_columns(); cid++) { + auto col_ptr = _input_mutable_block.mutable_columns()[cid].get(); + auto data = prev_row->agg_places(cid); + _agg_functions[cid]->create(data); + _agg_functions[cid]->add( Review Comment: _agg_functions may be inited in last call of merge_row_in_block. we should push init_agg as a member of MemTable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org