github-actions[bot] commented on code in PR #22504: URL: https://github.com/apache/doris/pull/22504#discussion_r1333846546
########## be/src/olap/rowset/beta_rowset_writer_v2.h: ########## @@ -120,6 +120,11 @@ class BetaRowsetWriterV2 : public RowsetWriter { return Status::OK(); } + // currently, a rowset cantains at most one segment, so we just return the segment's indicator maps + std::shared_ptr<IndicatorMaps> get_indicator_maps() const override { Review Comment: warning: function 'get_indicator_maps' should be marked [[nodiscard]] [modernize-use-nodiscard] ```suggestion [[nodiscard]] std::shared_ptr<IndicatorMaps> get_indicator_maps() const override { ``` ########## be/src/exec/tablet_info.h: ########## @@ -90,6 +90,7 @@ class OlapTableSchemaParam { return _partial_update_input_columns; } bool is_strict_mode() const { return _is_strict_mode; } + bool is_unique_key_replace_if_not_null() const { return _is_unique_key_replace_if_not_null; } Review Comment: warning: function 'is_unique_key_replace_if_not_null' should be marked [[nodiscard]] [modernize-use-nodiscard] ```suggestion [[nodiscard]] bool is_unique_key_replace_if_not_null() const { return _is_unique_key_replace_if_not_null; } ``` ########## be/src/olap/rowset/segment_v2/segment_writer.cpp: ########## @@ -539,93 +603,150 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* return Status::OK(); } -Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, - const std::vector<bool>& use_default_or_null_flag, - bool has_default_or_nullable, - const size_t& segment_start_pos) { - // create old value columns - auto old_value_block = _tablet_schema->create_missing_columns_block(); - std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids(); - CHECK(cids_missing.size() == old_value_block.columns()); - auto mutable_old_columns = old_value_block.mutate_columns(); - bool has_row_column = _tablet_schema->store_row_column(); - // record real pos, key is input line num, value is old_block line num - std::map<uint32_t, uint32_t> read_index; - size_t read_idx = 0; - for (auto rs_it : _rssid_to_rid) { - for (auto seg_it : rs_it.second) { - auto rowset = _rsid_to_rowset[rs_it.first]; - CHECK(rowset); - std::vector<uint32_t> rids; - for (auto id_and_pos : seg_it.second) { - rids.emplace_back(id_and_pos.rid); - read_index[id_and_pos.pos] = read_idx++; - } - if (has_row_column) { - auto st = _tablet->fetch_value_through_row_column(rowset, seg_it.first, rids, - cids_missing, old_value_block); - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value through row column"; - return st; - } - continue; - } - for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) { - TabletColumn tablet_column = _tablet_schema->column(cids_missing[cid]); - auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, rids, tablet_column, - mutable_old_columns[cid]); - // set read value to output block - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value by rowids"; - return st; - } +void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows, + const IndicatorMapsVertical& indicator_maps_vertical) { + _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); // fixme(baohan): ? + for (auto [cid, indicator_map] : indicator_maps_vertical) { + for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) { + if (indicator_map != nullptr && indicator_map[pos] != 0) { + (*_indicator_maps)[pos].emplace_back(cid); } } } +} + +// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3, v4, v5] where k1, k2 are key columns +// and v1, v2, v3, v4, v5 are value columns. The table has the following data: +// k1|k2|v1|v2|v3|v4|v5 +// 1 |1 |1 |1 |1 |1 |1 +// 2 |2 |2 |2 |2 |2 |2 +// 3 |3 |3 |3 |3 |3 |3 +// 4 |4 |4 |4 |4 |4 |4 +// 5 |5 |5 |5 |5 |5 |5 +// The user inserts the following data for partial update. Charactor `?` means that the cell is filled +// with indicator value(currently we use null as indicator value). +// row_num k1|k2|v1|v2|v3 +// 1 1 |1 |10|10|10 +// 2 2 |2 |? |20|20 +// 3 3 |3 |30|30|? +// 4 4 |4 |40|40|40 +// 5 5 |5 |50|? |50 +// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5] +// old_full_read_columns = [v4, v5], the old values from the previous rows will be read into these columns. +// old_point_read_columns = [k1, k2, v1, v2, v3], the old values from the previous rows will be read into these columns +// if the correspoding columns in the input block has cell with indicator value. +// Becase the column is immutable, filled_including_value_columns will store the data merged from +// the original input block and old_point_read_columns. After the insertion, the data in the table will be: +// k1|k2|v1|v2|v3|v4|v5 +// 1 |1 |10|10|10|1 |1 +// 2 |2 |2 |20|20|2 |2 +// 3 |3 |30|30|3 |3 |3 +// 4 |4 |40|40|40|4 |4 +// 5 |5 |50|5 |50|5 |5 +Status SegmentWriter::fill_missing_columns( + vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan, + const std::vector<uint32_t>& cids_full_read, const std::vector<uint32_t>& cids_point_read, + const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, + const size_t& segment_start_pos, bool is_unique_key_replace_if_not_null) { + vectorized::MutableColumns full_columns = full_block->mutate_columns(); + vectorized::Block old_full_read_block = _tablet_schema->create_missing_columns_block(); + vectorized::MutableColumns old_full_read_columns = old_full_read_block.mutate_columns(); Review Comment: warning: variable 'old_full_read_columns' is not initialized [cppcoreguidelines-init-variables] ```suggestion vectorized::MutableColumns old_full_read_columns = 0 = old_full_read_block.mutate_columns(); ``` ########## be/src/olap/rowset/segment_v2/segment_writer.cpp: ########## @@ -539,93 +603,150 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* return Status::OK(); } -Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, - const std::vector<bool>& use_default_or_null_flag, - bool has_default_or_nullable, - const size_t& segment_start_pos) { - // create old value columns - auto old_value_block = _tablet_schema->create_missing_columns_block(); - std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids(); - CHECK(cids_missing.size() == old_value_block.columns()); - auto mutable_old_columns = old_value_block.mutate_columns(); - bool has_row_column = _tablet_schema->store_row_column(); - // record real pos, key is input line num, value is old_block line num - std::map<uint32_t, uint32_t> read_index; - size_t read_idx = 0; - for (auto rs_it : _rssid_to_rid) { - for (auto seg_it : rs_it.second) { - auto rowset = _rsid_to_rowset[rs_it.first]; - CHECK(rowset); - std::vector<uint32_t> rids; - for (auto id_and_pos : seg_it.second) { - rids.emplace_back(id_and_pos.rid); - read_index[id_and_pos.pos] = read_idx++; - } - if (has_row_column) { - auto st = _tablet->fetch_value_through_row_column(rowset, seg_it.first, rids, - cids_missing, old_value_block); - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value through row column"; - return st; - } - continue; - } - for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) { - TabletColumn tablet_column = _tablet_schema->column(cids_missing[cid]); - auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, rids, tablet_column, - mutable_old_columns[cid]); - // set read value to output block - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value by rowids"; - return st; - } +void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows, + const IndicatorMapsVertical& indicator_maps_vertical) { + _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); // fixme(baohan): ? + for (auto [cid, indicator_map] : indicator_maps_vertical) { + for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) { + if (indicator_map != nullptr && indicator_map[pos] != 0) { + (*_indicator_maps)[pos].emplace_back(cid); } } } +} + +// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3, v4, v5] where k1, k2 are key columns +// and v1, v2, v3, v4, v5 are value columns. The table has the following data: +// k1|k2|v1|v2|v3|v4|v5 +// 1 |1 |1 |1 |1 |1 |1 +// 2 |2 |2 |2 |2 |2 |2 +// 3 |3 |3 |3 |3 |3 |3 +// 4 |4 |4 |4 |4 |4 |4 +// 5 |5 |5 |5 |5 |5 |5 +// The user inserts the following data for partial update. Charactor `?` means that the cell is filled +// with indicator value(currently we use null as indicator value). +// row_num k1|k2|v1|v2|v3 +// 1 1 |1 |10|10|10 +// 2 2 |2 |? |20|20 +// 3 3 |3 |30|30|? +// 4 4 |4 |40|40|40 +// 5 5 |5 |50|? |50 +// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5] +// old_full_read_columns = [v4, v5], the old values from the previous rows will be read into these columns. +// old_point_read_columns = [k1, k2, v1, v2, v3], the old values from the previous rows will be read into these columns +// if the correspoding columns in the input block has cell with indicator value. +// Becase the column is immutable, filled_including_value_columns will store the data merged from +// the original input block and old_point_read_columns. After the insertion, the data in the table will be: +// k1|k2|v1|v2|v3|v4|v5 +// 1 |1 |10|10|10|1 |1 +// 2 |2 |2 |20|20|2 |2 +// 3 |3 |30|30|3 |3 |3 +// 4 |4 |40|40|40|4 |4 +// 5 |5 |50|5 |50|5 |5 +Status SegmentWriter::fill_missing_columns( + vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan, + const std::vector<uint32_t>& cids_full_read, const std::vector<uint32_t>& cids_point_read, + const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, + const size_t& segment_start_pos, bool is_unique_key_replace_if_not_null) { + vectorized::MutableColumns full_columns = full_block->mutate_columns(); + vectorized::Block old_full_read_block = _tablet_schema->create_missing_columns_block(); + vectorized::MutableColumns old_full_read_columns = old_full_read_block.mutate_columns(); + vectorized::Block old_point_read_block = _tablet_schema->create_block_by_cids(cids_point_read); + vectorized::MutableColumns old_point_read_columns = old_point_read_block.mutate_columns(); Review Comment: warning: variable 'old_point_read_columns' is not initialized [cppcoreguidelines-init-variables] ```suggestion vectorized::MutableColumns old_point_read_columns = 0 = old_point_read_block.mutate_columns(); ``` ########## be/src/olap/rowset/segment_v2/segment_writer.h: ########## @@ -128,9 +130,15 @@ class SegmentWriter { void clear(); void set_mow_context(std::shared_ptr<MowContext> mow_context); - Status fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, + Status fill_missing_columns(vectorized::Block* full_block, + const PartialUpdateReadPlan& read_plan, + const std::vector<uint32_t>& cids_full_read, + const std::vector<uint32_t>& cids_point_read, const std::vector<bool>& use_default_or_null_flag, - bool has_default_or_nullable, const size_t& segment_start_pos); + bool has_default_or_nullable, const size_t& segment_start_pos, + bool is_unique_key_replace_if_not_null); + + std::shared_ptr<IndicatorMaps> get_indicator_maps() const { return _indicator_maps; } Review Comment: warning: function 'get_indicator_maps' should be marked [[nodiscard]] [modernize-use-nodiscard] ```suggestion [[nodiscard]] std::shared_ptr<IndicatorMaps> get_indicator_maps() const { return _indicator_maps; } ``` ########## be/src/olap/rowset/segment_v2/segment_writer.cpp: ########## @@ -539,93 +603,150 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* return Status::OK(); } -Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, - const std::vector<bool>& use_default_or_null_flag, - bool has_default_or_nullable, - const size_t& segment_start_pos) { - // create old value columns - auto old_value_block = _tablet_schema->create_missing_columns_block(); - std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids(); - CHECK(cids_missing.size() == old_value_block.columns()); - auto mutable_old_columns = old_value_block.mutate_columns(); - bool has_row_column = _tablet_schema->store_row_column(); - // record real pos, key is input line num, value is old_block line num - std::map<uint32_t, uint32_t> read_index; - size_t read_idx = 0; - for (auto rs_it : _rssid_to_rid) { - for (auto seg_it : rs_it.second) { - auto rowset = _rsid_to_rowset[rs_it.first]; - CHECK(rowset); - std::vector<uint32_t> rids; - for (auto id_and_pos : seg_it.second) { - rids.emplace_back(id_and_pos.rid); - read_index[id_and_pos.pos] = read_idx++; - } - if (has_row_column) { - auto st = _tablet->fetch_value_through_row_column(rowset, seg_it.first, rids, - cids_missing, old_value_block); - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value through row column"; - return st; - } - continue; - } - for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) { - TabletColumn tablet_column = _tablet_schema->column(cids_missing[cid]); - auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, rids, tablet_column, - mutable_old_columns[cid]); - // set read value to output block - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value by rowids"; - return st; - } +void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows, + const IndicatorMapsVertical& indicator_maps_vertical) { + _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); // fixme(baohan): ? + for (auto [cid, indicator_map] : indicator_maps_vertical) { + for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) { + if (indicator_map != nullptr && indicator_map[pos] != 0) { + (*_indicator_maps)[pos].emplace_back(cid); } } } +} + +// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3, v4, v5] where k1, k2 are key columns +// and v1, v2, v3, v4, v5 are value columns. The table has the following data: +// k1|k2|v1|v2|v3|v4|v5 +// 1 |1 |1 |1 |1 |1 |1 +// 2 |2 |2 |2 |2 |2 |2 +// 3 |3 |3 |3 |3 |3 |3 +// 4 |4 |4 |4 |4 |4 |4 +// 5 |5 |5 |5 |5 |5 |5 +// The user inserts the following data for partial update. Charactor `?` means that the cell is filled +// with indicator value(currently we use null as indicator value). +// row_num k1|k2|v1|v2|v3 +// 1 1 |1 |10|10|10 +// 2 2 |2 |? |20|20 +// 3 3 |3 |30|30|? +// 4 4 |4 |40|40|40 +// 5 5 |5 |50|? |50 +// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5] +// old_full_read_columns = [v4, v5], the old values from the previous rows will be read into these columns. +// old_point_read_columns = [k1, k2, v1, v2, v3], the old values from the previous rows will be read into these columns +// if the correspoding columns in the input block has cell with indicator value. +// Becase the column is immutable, filled_including_value_columns will store the data merged from +// the original input block and old_point_read_columns. After the insertion, the data in the table will be: +// k1|k2|v1|v2|v3|v4|v5 +// 1 |1 |10|10|10|1 |1 +// 2 |2 |2 |20|20|2 |2 +// 3 |3 |30|30|3 |3 |3 +// 4 |4 |40|40|40|4 |4 +// 5 |5 |50|5 |50|5 |5 +Status SegmentWriter::fill_missing_columns( + vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan, + const std::vector<uint32_t>& cids_full_read, const std::vector<uint32_t>& cids_point_read, + const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, + const size_t& segment_start_pos, bool is_unique_key_replace_if_not_null) { + vectorized::MutableColumns full_columns = full_block->mutate_columns(); + vectorized::Block old_full_read_block = _tablet_schema->create_missing_columns_block(); + vectorized::MutableColumns old_full_read_columns = old_full_read_block.mutate_columns(); + vectorized::Block old_point_read_block = _tablet_schema->create_block_by_cids(cids_point_read); + vectorized::MutableColumns old_point_read_columns = old_point_read_block.mutate_columns(); + vectorized::MutableColumns filled_including_value_columns = + old_point_read_block.clone_empty_columns(); // used to hold data after being filled + // !!NOTE: columns in old_point_read_block may have different row nums! + + // rowid in input block -> line num in old_full_read_block + std::map<uint32_t, uint32_t> missing_cols_read_index; + // partial update cid -> (rowid in input block -> line num in old_point_read_block) + std::map<uint32_t, std::map<uint32_t, uint32_t>> parital_update_cols_read_index; + + if (is_unique_key_replace_if_not_null) { + RETURN_IF_ERROR(_tablet->read_columns_by_plan( + _tablet_schema, _rsid_to_rowset, read_plan, &cids_full_read, &cids_point_read, + &old_full_read_block, &old_point_read_block, &missing_cols_read_index, + &parital_update_cols_read_index)); + } else { + RETURN_IF_ERROR(_tablet->read_columns_by_plan(_tablet_schema, _rsid_to_rowset, read_plan, + &cids_full_read, &old_full_read_block, + &missing_cols_read_index)); + } + // build default value columns - auto default_value_block = old_value_block.clone_empty(); + auto default_value_block = old_full_read_block.clone_empty(); auto mutable_default_value_columns = default_value_block.mutate_columns(); if (has_default_or_nullable) { - for (auto i = 0; i < cids_missing.size(); ++i) { - const auto& column = _tablet_schema->column(cids_missing[i]); + for (auto i = 0; i < cids_full_read.size(); ++i) { + const auto& column = _tablet_schema->column(cids_full_read[i]); if (column.has_default_value()) { - auto default_value = _tablet_schema->column(cids_missing[i]).default_value(); + auto default_value = _tablet_schema->column(cids_full_read[i]).default_value(); vectorized::ReadBuffer rb(const_cast<char*>(default_value.c_str()), default_value.size()); - old_value_block.get_by_position(i).type->from_string( + old_full_read_block.get_by_position(i).type->from_string( rb, mutable_default_value_columns[i].get()); } } } - // fill all missing value from mutable_old_columns, need to consider default value and null value + // fill all missing value from old_value_columns, need to consider default value and null value for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) { if (use_default_or_null_flag[idx]) { - for (auto i = 0; i < cids_missing.size(); ++i) { + for (auto i = 0; i < cids_full_read.size(); ++i) { // if the column has default value, fiil it with default value // otherwise, if the column is nullable, fill it with null value - const auto& tablet_column = _tablet_schema->column(cids_missing[i]); + const auto& tablet_column = _tablet_schema->column(cids_full_read[i]); if (tablet_column.has_default_value()) { - mutable_full_columns[cids_missing[i]]->insert_from( + full_columns[cids_full_read[i]]->insert_from( *mutable_default_value_columns[i].get(), 0); } else if (tablet_column.is_nullable()) { auto nullable_column = assert_cast<vectorized::ColumnNullable*>( - mutable_full_columns[cids_missing[i]].get()); + full_columns[cids_full_read[i]].get()); nullable_column->insert_null_elements(1); } else { // If the control flow reaches this branch, the column neither has default value // nor is nullable. It means that the row's delete sign is marked, and the value // columns are useless and won't be read. So we can just put arbitary values in the cells - mutable_full_columns[cids_missing[i]]->insert_default(); + full_columns[cids_full_read[i]]->insert_default(); } } continue; } - auto pos_in_old_block = read_index[idx + segment_start_pos]; - for (auto i = 0; i < cids_missing.size(); ++i) { - mutable_full_columns[cids_missing[i]]->insert_from( - *old_value_block.get_columns_with_type_and_name()[i].column.get(), - pos_in_old_block); + + //TODO(bobhan1): fix me later(the wrong row pos) + // TODO(bobhan1): handle row store column here!!! + uint32_t pos_in_old_block = missing_cols_read_index[idx + segment_start_pos]; + for (auto i = 0; i < cids_full_read.size(); ++i) { + uint32_t cid = cids_full_read[i]; + if (full_block->get_by_position(cid).name != BeConsts::ROW_STORE_COL) { + full_columns[cid]->insert_from( + *old_full_read_block.get_columns_with_type_and_name()[i].column.get(), + pos_in_old_block); + } + } + + if (is_unique_key_replace_if_not_null) { + //TODO(bobhan1): fix me later(the wrong row pos) + for (size_t i = 0; i < cids_point_read.size(); i++) { + uint32_t cid = cids_point_read[i]; + if (parital_update_cols_read_index[cid].contains(idx + segment_start_pos)) { + // cells with indicator value should be replaced with old values in previous rows + uint32_t pos_in_old_block = Review Comment: warning: variable 'pos_in_old_block' is not initialized [cppcoreguidelines-init-variables] ```suggestion uint32_t pos_in_old_block = 0 = ``` ########## be/src/vec/jsonb/serialize.cpp: ########## @@ -109,4 +130,41 @@ void JsonbSerializeUtil::jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const } } +void JsonbSerializeUtil::jsonb_to_block( + const DataTypeSerDeSPtrs& serdes_full_read, const DataTypeSerDeSPtrs& serdes_point_read, + const char* data, size_t size, + const std::unordered_map<uint32_t, uint32_t>& col_uid_to_idx_full_read, + const std::unordered_map<uint32_t, std::pair<uint32_t, uint32_t>>& + col_uid_to_idx_cid_point_read, + std::unordered_map<uint32_t, bool>& point_read_cids, Block& block_full_read, + Block& block_point_read) { + JsonbDocument& doc = *JsonbDocument::createDocument(data, size); + size_t full_read_filled_columns = 0; + for (auto it = doc->begin(); it != doc->end(); ++it) { + auto col_it = col_uid_to_idx_full_read.find(it->getKeyId()); + if (col_it != col_uid_to_idx_full_read.end()) { + MutableColumnPtr dst_column = + block_full_read.get_by_position(col_it->second).column->assume_mutable(); + serdes_full_read[col_it->second]->read_one_cell_from_jsonb(*dst_column, it->value()); + ++full_read_filled_columns; + } else { + auto it2 = col_uid_to_idx_cid_point_read.find(it->getKeyId()); + if (it2 != col_uid_to_idx_cid_point_read.end()) { + uint32_t cid = it2->second.second; + uint32_t idx = it2->second.first; + auto it3 = point_read_cids.find(cid); + if (it3 != point_read_cids.end()) { + MutableColumnPtr dst_column = Review Comment: warning: variable 'dst_column' is not initialized [cppcoreguidelines-init-variables] ```suggestion MutableColumnPtr dst_column = 0 = ``` ########## be/src/olap/rowset/segment_v2/segment_writer.cpp: ########## @@ -332,25 +335,69 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* } DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write); - // find missing column cids + // all the cells in missing_cids will be filled from the old rows std::vector<uint32_t> missing_cids = _tablet_schema->get_missing_cids(); std::vector<uint32_t> including_cids = _tablet_schema->get_update_cids(); + std::vector<uint32_t> determistic_cids; // columns that dont't need read from old rows + std::vector<uint32_t> point_read_cids; // create full block and fill with input columns auto full_block = _tablet_schema->create_block(); - size_t input_id = 0; - for (auto i : including_cids) { - full_block.replace_by_position(i, block->get_by_position(input_id++).column); + + // indictors to apply partial updates for including columns, + // cells with `indicator` values should be read from the old rows + // currently we use treat null value as `indicator` + IndicatorMapsVertical indicator_maps_vertical; // cid -> rows + + PartialUpdateReadPlan read_plan; + if (_tablet_schema->store_row_column()) { + read_plan = RowStoreReadPlan {}; + } + + bool is_unique_key_replace_if_not_null = _tablet_schema->is_unique_key_replace_if_not_null(); Review Comment: warning: variable 'is_unique_key_replace_if_not_null' is not initialized [cppcoreguidelines-init-variables] ```suggestion bool is_unique_key_replace_if_not_null = false = _tablet_schema->is_unique_key_replace_if_not_null(); ``` ########## be/src/olap/rowset/segment_v2/segment_writer.cpp: ########## @@ -539,93 +603,150 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* return Status::OK(); } -Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, - const std::vector<bool>& use_default_or_null_flag, - bool has_default_or_nullable, - const size_t& segment_start_pos) { - // create old value columns - auto old_value_block = _tablet_schema->create_missing_columns_block(); - std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids(); - CHECK(cids_missing.size() == old_value_block.columns()); - auto mutable_old_columns = old_value_block.mutate_columns(); - bool has_row_column = _tablet_schema->store_row_column(); - // record real pos, key is input line num, value is old_block line num - std::map<uint32_t, uint32_t> read_index; - size_t read_idx = 0; - for (auto rs_it : _rssid_to_rid) { - for (auto seg_it : rs_it.second) { - auto rowset = _rsid_to_rowset[rs_it.first]; - CHECK(rowset); - std::vector<uint32_t> rids; - for (auto id_and_pos : seg_it.second) { - rids.emplace_back(id_and_pos.rid); - read_index[id_and_pos.pos] = read_idx++; - } - if (has_row_column) { - auto st = _tablet->fetch_value_through_row_column(rowset, seg_it.first, rids, - cids_missing, old_value_block); - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value through row column"; - return st; - } - continue; - } - for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) { - TabletColumn tablet_column = _tablet_schema->column(cids_missing[cid]); - auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, rids, tablet_column, - mutable_old_columns[cid]); - // set read value to output block - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value by rowids"; - return st; - } +void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows, + const IndicatorMapsVertical& indicator_maps_vertical) { + _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); // fixme(baohan): ? + for (auto [cid, indicator_map] : indicator_maps_vertical) { + for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) { + if (indicator_map != nullptr && indicator_map[pos] != 0) { + (*_indicator_maps)[pos].emplace_back(cid); } } } +} + +// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3, v4, v5] where k1, k2 are key columns +// and v1, v2, v3, v4, v5 are value columns. The table has the following data: +// k1|k2|v1|v2|v3|v4|v5 +// 1 |1 |1 |1 |1 |1 |1 +// 2 |2 |2 |2 |2 |2 |2 +// 3 |3 |3 |3 |3 |3 |3 +// 4 |4 |4 |4 |4 |4 |4 +// 5 |5 |5 |5 |5 |5 |5 +// The user inserts the following data for partial update. Charactor `?` means that the cell is filled +// with indicator value(currently we use null as indicator value). +// row_num k1|k2|v1|v2|v3 +// 1 1 |1 |10|10|10 +// 2 2 |2 |? |20|20 +// 3 3 |3 |30|30|? +// 4 4 |4 |40|40|40 +// 5 5 |5 |50|? |50 +// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5] +// old_full_read_columns = [v4, v5], the old values from the previous rows will be read into these columns. +// old_point_read_columns = [k1, k2, v1, v2, v3], the old values from the previous rows will be read into these columns +// if the correspoding columns in the input block has cell with indicator value. +// Becase the column is immutable, filled_including_value_columns will store the data merged from +// the original input block and old_point_read_columns. After the insertion, the data in the table will be: +// k1|k2|v1|v2|v3|v4|v5 +// 1 |1 |10|10|10|1 |1 +// 2 |2 |2 |20|20|2 |2 +// 3 |3 |30|30|3 |3 |3 +// 4 |4 |40|40|40|4 |4 +// 5 |5 |50|5 |50|5 |5 +Status SegmentWriter::fill_missing_columns( + vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan, + const std::vector<uint32_t>& cids_full_read, const std::vector<uint32_t>& cids_point_read, + const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, + const size_t& segment_start_pos, bool is_unique_key_replace_if_not_null) { + vectorized::MutableColumns full_columns = full_block->mutate_columns(); + vectorized::Block old_full_read_block = _tablet_schema->create_missing_columns_block(); + vectorized::MutableColumns old_full_read_columns = old_full_read_block.mutate_columns(); + vectorized::Block old_point_read_block = _tablet_schema->create_block_by_cids(cids_point_read); + vectorized::MutableColumns old_point_read_columns = old_point_read_block.mutate_columns(); + vectorized::MutableColumns filled_including_value_columns = Review Comment: warning: variable 'filled_including_value_columns' is not initialized [cppcoreguidelines-init-variables] ```suggestion vectorized::MutableColumns filled_including_value_columns = 0 = ``` ########## be/src/olap/rowset/segment_v2/segment_writer.cpp: ########## @@ -332,25 +335,69 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* } DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write); - // find missing column cids + // all the cells in missing_cids will be filled from the old rows std::vector<uint32_t> missing_cids = _tablet_schema->get_missing_cids(); std::vector<uint32_t> including_cids = _tablet_schema->get_update_cids(); + std::vector<uint32_t> determistic_cids; // columns that dont't need read from old rows + std::vector<uint32_t> point_read_cids; // create full block and fill with input columns auto full_block = _tablet_schema->create_block(); - size_t input_id = 0; - for (auto i : including_cids) { - full_block.replace_by_position(i, block->get_by_position(input_id++).column); + + // indictors to apply partial updates for including columns, + // cells with `indicator` values should be read from the old rows + // currently we use treat null value as `indicator` + IndicatorMapsVertical indicator_maps_vertical; // cid -> rows + + PartialUpdateReadPlan read_plan; Review Comment: warning: variable 'read_plan' is not initialized [cppcoreguidelines-init-variables] ```suggestion PartialUpdateReadPlan read_plan = 0; ``` ########## be/src/olap/rowset/segment_v2/segment_writer.cpp: ########## @@ -332,25 +335,69 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* } DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write); - // find missing column cids + // all the cells in missing_cids will be filled from the old rows std::vector<uint32_t> missing_cids = _tablet_schema->get_missing_cids(); std::vector<uint32_t> including_cids = _tablet_schema->get_update_cids(); + std::vector<uint32_t> determistic_cids; // columns that dont't need read from old rows + std::vector<uint32_t> point_read_cids; // create full block and fill with input columns auto full_block = _tablet_schema->create_block(); - size_t input_id = 0; - for (auto i : including_cids) { - full_block.replace_by_position(i, block->get_by_position(input_id++).column); + + // indictors to apply partial updates for including columns, + // cells with `indicator` values should be read from the old rows + // currently we use treat null value as `indicator` + IndicatorMapsVertical indicator_maps_vertical; // cid -> rows Review Comment: warning: variable 'indicator_maps_vertical' is not initialized [cppcoreguidelines-init-variables] ```suggestion IndicatorMapsVertical indicator_maps_vertical = 0; // cid -> rows ``` ########## be/src/olap/rowset/beta_rowset_writer.h: ########## @@ -102,6 +102,11 @@ class BetaRowsetWriter : public RowsetWriter { int64_t num_rows_filtered() const override { return _segment_creator.num_rows_filtered(); } + // currently, a rowset cantains at most one segment, so we just return the segment's indicator maps + std::shared_ptr<IndicatorMaps> get_indicator_maps() const override { Review Comment: warning: function 'get_indicator_maps' should be marked [[nodiscard]] [modernize-use-nodiscard] ```suggestion [[nodiscard]] std::shared_ptr<IndicatorMaps> get_indicator_maps() const override { ``` ########## be/src/olap/rowset/rowset_writer.h: ########## @@ -131,6 +131,8 @@ class RowsetWriter { virtual int64_t num_rows_filtered() const = 0; + virtual std::shared_ptr<IndicatorMaps> get_indicator_maps() const = 0; Review Comment: warning: function 'get_indicator_maps' should be marked [[nodiscard]] [modernize-use-nodiscard] ```suggestion [[nodiscard]] virtual std::shared_ptr<IndicatorMaps> get_indicator_maps() const = 0; ``` ########## be/src/olap/rowset/segment_creator.h: ########## @@ -166,6 +170,10 @@ class SegmentCreator { int64_t num_rows_filtered() const { return _segment_flusher.num_rows_filtered(); } + std::shared_ptr<IndicatorMaps> get_indicator_maps() const { Review Comment: warning: function 'get_indicator_maps' should be marked [[nodiscard]] [modernize-use-nodiscard] ```suggestion [[nodiscard]] std::shared_ptr<IndicatorMaps> get_indicator_maps() const { ``` ########## be/src/olap/rowset/segment_v2/segment_writer.cpp: ########## @@ -539,93 +603,150 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* return Status::OK(); } -Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, - const std::vector<bool>& use_default_or_null_flag, - bool has_default_or_nullable, - const size_t& segment_start_pos) { - // create old value columns - auto old_value_block = _tablet_schema->create_missing_columns_block(); - std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids(); - CHECK(cids_missing.size() == old_value_block.columns()); - auto mutable_old_columns = old_value_block.mutate_columns(); - bool has_row_column = _tablet_schema->store_row_column(); - // record real pos, key is input line num, value is old_block line num - std::map<uint32_t, uint32_t> read_index; - size_t read_idx = 0; - for (auto rs_it : _rssid_to_rid) { - for (auto seg_it : rs_it.second) { - auto rowset = _rsid_to_rowset[rs_it.first]; - CHECK(rowset); - std::vector<uint32_t> rids; - for (auto id_and_pos : seg_it.second) { - rids.emplace_back(id_and_pos.rid); - read_index[id_and_pos.pos] = read_idx++; - } - if (has_row_column) { - auto st = _tablet->fetch_value_through_row_column(rowset, seg_it.first, rids, - cids_missing, old_value_block); - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value through row column"; - return st; - } - continue; - } - for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) { - TabletColumn tablet_column = _tablet_schema->column(cids_missing[cid]); - auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, rids, tablet_column, - mutable_old_columns[cid]); - // set read value to output block - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value by rowids"; - return st; - } +void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows, + const IndicatorMapsVertical& indicator_maps_vertical) { + _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); // fixme(baohan): ? + for (auto [cid, indicator_map] : indicator_maps_vertical) { + for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) { + if (indicator_map != nullptr && indicator_map[pos] != 0) { + (*_indicator_maps)[pos].emplace_back(cid); } } } +} + +// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3, v4, v5] where k1, k2 are key columns +// and v1, v2, v3, v4, v5 are value columns. The table has the following data: +// k1|k2|v1|v2|v3|v4|v5 +// 1 |1 |1 |1 |1 |1 |1 +// 2 |2 |2 |2 |2 |2 |2 +// 3 |3 |3 |3 |3 |3 |3 +// 4 |4 |4 |4 |4 |4 |4 +// 5 |5 |5 |5 |5 |5 |5 +// The user inserts the following data for partial update. Charactor `?` means that the cell is filled +// with indicator value(currently we use null as indicator value). +// row_num k1|k2|v1|v2|v3 +// 1 1 |1 |10|10|10 +// 2 2 |2 |? |20|20 +// 3 3 |3 |30|30|? +// 4 4 |4 |40|40|40 +// 5 5 |5 |50|? |50 +// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5] +// old_full_read_columns = [v4, v5], the old values from the previous rows will be read into these columns. +// old_point_read_columns = [k1, k2, v1, v2, v3], the old values from the previous rows will be read into these columns +// if the correspoding columns in the input block has cell with indicator value. +// Becase the column is immutable, filled_including_value_columns will store the data merged from +// the original input block and old_point_read_columns. After the insertion, the data in the table will be: +// k1|k2|v1|v2|v3|v4|v5 +// 1 |1 |10|10|10|1 |1 +// 2 |2 |2 |20|20|2 |2 +// 3 |3 |30|30|3 |3 |3 +// 4 |4 |40|40|40|4 |4 +// 5 |5 |50|5 |50|5 |5 +Status SegmentWriter::fill_missing_columns( + vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan, + const std::vector<uint32_t>& cids_full_read, const std::vector<uint32_t>& cids_point_read, + const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, + const size_t& segment_start_pos, bool is_unique_key_replace_if_not_null) { + vectorized::MutableColumns full_columns = full_block->mutate_columns(); Review Comment: warning: variable 'full_columns' is not initialized [cppcoreguidelines-init-variables] ```suggestion vectorized::MutableColumns full_columns = 0 = full_block->mutate_columns(); ``` ########## be/src/olap/tablet.cpp: ########## @@ -2687,20 +2688,40 @@ Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint &column_iterator, &stats)); // get and parse tuple row vectorized::MutableColumnPtr column_ptr = vectorized::ColumnString::create(); - RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), rowids.size(), column_ptr)); - assert(column_ptr->size() == rowids.size()); + RETURN_IF_ERROR(column_iterator->read_by_rowids(rids.data(), rids.size(), column_ptr)); + assert(column_ptr->size() == rids.size()); auto string_column = static_cast<vectorized::ColumnString*>(column_ptr.get()); - vectorized::DataTypeSerDeSPtrs serdes; - serdes.resize(cids.size()); - std::unordered_map<uint32_t, uint32_t> col_uid_to_idx; - for (int i = 0; i < cids.size(); ++i) { - const TabletColumn& column = tablet_schema->column(cids[i]); + vectorized::DataTypeSerDeSPtrs serdes_full_read; + serdes_full_read.resize(cids_full_read->size()); Review Comment: warning: variable 'serdes_full_read' is not initialized [cppcoreguidelines-init-variables] ```suggestion ypeSerDeSPtrs serdes_full_read = 0; ``` ########## be/src/olap/tablet_schema.h: ########## @@ -363,8 +364,12 @@ class TabletSchema { } void set_is_strict_mode(bool is_strict_mode) { _is_strict_mode = is_strict_mode; } bool is_strict_mode() const { return _is_strict_mode; } - std::vector<uint32_t> get_missing_cids() const { return _missing_cids; } - std::vector<uint32_t> get_update_cids() const { return _update_cids; } + void set_is_unique_key_replace_if_not_null(bool is_unique_key_replace_if_not_null) { + _is_unique_key_replace_if_not_null = is_unique_key_replace_if_not_null; + } + bool is_unique_key_replace_if_not_null() const { return _is_unique_key_replace_if_not_null; } Review Comment: warning: function 'is_unique_key_replace_if_not_null' should be marked [[nodiscard]] [modernize-use-nodiscard] ```suggestion [[nodiscard]] bool is_unique_key_replace_if_not_null() const { return _is_unique_key_replace_if_not_null; } ``` ########## be/src/olap/rowset/segment_v2/segment_writer.cpp: ########## @@ -539,93 +603,150 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* return Status::OK(); } -Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, - const std::vector<bool>& use_default_or_null_flag, - bool has_default_or_nullable, - const size_t& segment_start_pos) { - // create old value columns - auto old_value_block = _tablet_schema->create_missing_columns_block(); - std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids(); - CHECK(cids_missing.size() == old_value_block.columns()); - auto mutable_old_columns = old_value_block.mutate_columns(); - bool has_row_column = _tablet_schema->store_row_column(); - // record real pos, key is input line num, value is old_block line num - std::map<uint32_t, uint32_t> read_index; - size_t read_idx = 0; - for (auto rs_it : _rssid_to_rid) { - for (auto seg_it : rs_it.second) { - auto rowset = _rsid_to_rowset[rs_it.first]; - CHECK(rowset); - std::vector<uint32_t> rids; - for (auto id_and_pos : seg_it.second) { - rids.emplace_back(id_and_pos.rid); - read_index[id_and_pos.pos] = read_idx++; - } - if (has_row_column) { - auto st = _tablet->fetch_value_through_row_column(rowset, seg_it.first, rids, - cids_missing, old_value_block); - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value through row column"; - return st; - } - continue; - } - for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) { - TabletColumn tablet_column = _tablet_schema->column(cids_missing[cid]); - auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, rids, tablet_column, - mutable_old_columns[cid]); - // set read value to output block - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value by rowids"; - return st; - } +void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows, + const IndicatorMapsVertical& indicator_maps_vertical) { + _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); // fixme(baohan): ? + for (auto [cid, indicator_map] : indicator_maps_vertical) { + for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) { + if (indicator_map != nullptr && indicator_map[pos] != 0) { + (*_indicator_maps)[pos].emplace_back(cid); } } } +} + +// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3, v4, v5] where k1, k2 are key columns +// and v1, v2, v3, v4, v5 are value columns. The table has the following data: +// k1|k2|v1|v2|v3|v4|v5 +// 1 |1 |1 |1 |1 |1 |1 +// 2 |2 |2 |2 |2 |2 |2 +// 3 |3 |3 |3 |3 |3 |3 +// 4 |4 |4 |4 |4 |4 |4 +// 5 |5 |5 |5 |5 |5 |5 +// The user inserts the following data for partial update. Charactor `?` means that the cell is filled +// with indicator value(currently we use null as indicator value). +// row_num k1|k2|v1|v2|v3 +// 1 1 |1 |10|10|10 +// 2 2 |2 |? |20|20 +// 3 3 |3 |30|30|? +// 4 4 |4 |40|40|40 +// 5 5 |5 |50|? |50 +// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5] +// old_full_read_columns = [v4, v5], the old values from the previous rows will be read into these columns. +// old_point_read_columns = [k1, k2, v1, v2, v3], the old values from the previous rows will be read into these columns +// if the correspoding columns in the input block has cell with indicator value. +// Becase the column is immutable, filled_including_value_columns will store the data merged from +// the original input block and old_point_read_columns. After the insertion, the data in the table will be: +// k1|k2|v1|v2|v3|v4|v5 +// 1 |1 |10|10|10|1 |1 +// 2 |2 |2 |20|20|2 |2 +// 3 |3 |30|30|3 |3 |3 +// 4 |4 |40|40|40|4 |4 +// 5 |5 |50|5 |50|5 |5 +Status SegmentWriter::fill_missing_columns( + vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan, + const std::vector<uint32_t>& cids_full_read, const std::vector<uint32_t>& cids_point_read, + const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, + const size_t& segment_start_pos, bool is_unique_key_replace_if_not_null) { + vectorized::MutableColumns full_columns = full_block->mutate_columns(); + vectorized::Block old_full_read_block = _tablet_schema->create_missing_columns_block(); + vectorized::MutableColumns old_full_read_columns = old_full_read_block.mutate_columns(); + vectorized::Block old_point_read_block = _tablet_schema->create_block_by_cids(cids_point_read); + vectorized::MutableColumns old_point_read_columns = old_point_read_block.mutate_columns(); + vectorized::MutableColumns filled_including_value_columns = + old_point_read_block.clone_empty_columns(); // used to hold data after being filled + // !!NOTE: columns in old_point_read_block may have different row nums! + + // rowid in input block -> line num in old_full_read_block + std::map<uint32_t, uint32_t> missing_cols_read_index; + // partial update cid -> (rowid in input block -> line num in old_point_read_block) + std::map<uint32_t, std::map<uint32_t, uint32_t>> parital_update_cols_read_index; + + if (is_unique_key_replace_if_not_null) { + RETURN_IF_ERROR(_tablet->read_columns_by_plan( + _tablet_schema, _rsid_to_rowset, read_plan, &cids_full_read, &cids_point_read, + &old_full_read_block, &old_point_read_block, &missing_cols_read_index, + &parital_update_cols_read_index)); + } else { + RETURN_IF_ERROR(_tablet->read_columns_by_plan(_tablet_schema, _rsid_to_rowset, read_plan, + &cids_full_read, &old_full_read_block, + &missing_cols_read_index)); + } + // build default value columns - auto default_value_block = old_value_block.clone_empty(); + auto default_value_block = old_full_read_block.clone_empty(); auto mutable_default_value_columns = default_value_block.mutate_columns(); if (has_default_or_nullable) { - for (auto i = 0; i < cids_missing.size(); ++i) { - const auto& column = _tablet_schema->column(cids_missing[i]); + for (auto i = 0; i < cids_full_read.size(); ++i) { + const auto& column = _tablet_schema->column(cids_full_read[i]); if (column.has_default_value()) { - auto default_value = _tablet_schema->column(cids_missing[i]).default_value(); + auto default_value = _tablet_schema->column(cids_full_read[i]).default_value(); vectorized::ReadBuffer rb(const_cast<char*>(default_value.c_str()), default_value.size()); - old_value_block.get_by_position(i).type->from_string( + old_full_read_block.get_by_position(i).type->from_string( rb, mutable_default_value_columns[i].get()); } } } - // fill all missing value from mutable_old_columns, need to consider default value and null value + // fill all missing value from old_value_columns, need to consider default value and null value for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) { if (use_default_or_null_flag[idx]) { - for (auto i = 0; i < cids_missing.size(); ++i) { + for (auto i = 0; i < cids_full_read.size(); ++i) { // if the column has default value, fiil it with default value // otherwise, if the column is nullable, fill it with null value - const auto& tablet_column = _tablet_schema->column(cids_missing[i]); + const auto& tablet_column = _tablet_schema->column(cids_full_read[i]); if (tablet_column.has_default_value()) { - mutable_full_columns[cids_missing[i]]->insert_from( + full_columns[cids_full_read[i]]->insert_from( *mutable_default_value_columns[i].get(), 0); } else if (tablet_column.is_nullable()) { auto nullable_column = assert_cast<vectorized::ColumnNullable*>( - mutable_full_columns[cids_missing[i]].get()); + full_columns[cids_full_read[i]].get()); nullable_column->insert_null_elements(1); } else { // If the control flow reaches this branch, the column neither has default value // nor is nullable. It means that the row's delete sign is marked, and the value // columns are useless and won't be read. So we can just put arbitary values in the cells - mutable_full_columns[cids_missing[i]]->insert_default(); + full_columns[cids_full_read[i]]->insert_default(); } } continue; } - auto pos_in_old_block = read_index[idx + segment_start_pos]; - for (auto i = 0; i < cids_missing.size(); ++i) { - mutable_full_columns[cids_missing[i]]->insert_from( - *old_value_block.get_columns_with_type_and_name()[i].column.get(), - pos_in_old_block); + + //TODO(bobhan1): fix me later(the wrong row pos) + // TODO(bobhan1): handle row store column here!!! + uint32_t pos_in_old_block = missing_cols_read_index[idx + segment_start_pos]; Review Comment: warning: variable 'pos_in_old_block' is not initialized [cppcoreguidelines-init-variables] ```suggestion uint32_t pos_in_old_block = 0 = missing_cols_read_index[idx + segment_start_pos]; ``` ########## be/src/olap/tablet.cpp: ########## @@ -2687,20 +2688,40 @@ Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint &column_iterator, &stats)); // get and parse tuple row vectorized::MutableColumnPtr column_ptr = vectorized::ColumnString::create(); - RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), rowids.size(), column_ptr)); - assert(column_ptr->size() == rowids.size()); + RETURN_IF_ERROR(column_iterator->read_by_rowids(rids.data(), rids.size(), column_ptr)); + assert(column_ptr->size() == rids.size()); auto string_column = static_cast<vectorized::ColumnString*>(column_ptr.get()); - vectorized::DataTypeSerDeSPtrs serdes; - serdes.resize(cids.size()); - std::unordered_map<uint32_t, uint32_t> col_uid_to_idx; - for (int i = 0; i < cids.size(); ++i) { - const TabletColumn& column = tablet_schema->column(cids[i]); + vectorized::DataTypeSerDeSPtrs serdes_full_read; + serdes_full_read.resize(cids_full_read->size()); + + std::unordered_map<uint32_t, uint32_t> col_uid_to_idx_full_read; + + for (int i = 0; i < cids_full_read->size(); ++i) { + const TabletColumn& column = tablet_schema->column(cids_full_read->at(i)); vectorized::DataTypePtr type = vectorized::DataTypeFactory::instance().create_data_type(column); - col_uid_to_idx[column.unique_id()] = i; - serdes[i] = type->get_serde(); + col_uid_to_idx_full_read[column.unique_id()] = i; + serdes_full_read[i] = type->get_serde(); + } + + if (with_point_read) { + vectorized::DataTypeSerDeSPtrs serdes_point_read; + serdes_point_read.resize(cids_point_read->size()); Review Comment: warning: variable 'serdes_point_read' is not initialized [cppcoreguidelines-init-variables] ```suggestion peSerDeSPtrs serdes_point_read = 0; ``` ########## be/src/olap/tablet.cpp: ########## @@ -3084,110 +3118,278 @@ Status Tablet::generate_new_block_for_partial_update( TabletSchemaSPtr rowset_schema, const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, + std::shared_ptr<std::map<uint32_t, std::vector<uint32_t>>> indicator_maps, vectorized::Block* output_block) { // do partial update related works // 1. read columns by read plan // 2. generate new block // 3. write a new segment and modify rowset meta // 4. mark current keys deleted CHECK(output_block); - auto full_mutable_columns = output_block->mutate_columns(); - auto old_block = rowset_schema->create_missing_columns_block(); - auto missing_cids = rowset_schema->get_missing_cids(); - auto update_block = rowset_schema->create_update_columns_block(); - auto update_cids = rowset_schema->get_update_cids(); - - std::map<uint32_t, uint32_t> read_index_old; - RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids, read_plan_ori, rsid_to_rowset, - old_block, &read_index_old)); - - std::map<uint32_t, uint32_t> read_index_update; - RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids, read_plan_update, - rsid_to_rowset, update_block, &read_index_update)); + if (!indicator_maps) { + auto full_mutable_columns = output_block->mutate_columns(); + auto old_block = rowset_schema->create_missing_columns_block(); + auto missing_cids = rowset_schema->get_missing_cids(); + auto update_block = rowset_schema->create_update_columns_block(); + auto update_cids = rowset_schema->get_update_cids(); + + std::map<uint32_t, uint32_t> read_index_old; + RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, read_plan_ori, + &missing_cids, &old_block, &read_index_old)); + + std::map<uint32_t, uint32_t> read_index_update; + RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, read_plan_update, + &update_cids, &update_block, &read_index_update)); + + // build full block + CHECK(read_index_old.size() == read_index_update.size()); + for (auto i = 0; i < missing_cids.size(); ++i) { + for (auto idx = 0; idx < read_index_old.size(); ++idx) { + full_mutable_columns[missing_cids[i]]->insert_from( + *old_block.get_columns_with_type_and_name()[i].column.get(), + read_index_old[idx]); + } + } + for (auto i = 0; i < update_cids.size(); ++i) { + for (auto idx = 0; idx < read_index_update.size(); ++idx) { + full_mutable_columns[update_cids[i]]->insert_from( + *update_block.get_columns_with_type_and_name()[i].column.get(), + read_index_update[idx]); + } + } + VLOG_DEBUG << "full block when publish: " << output_block->dump_data(); + } else { + std::unordered_set<uint32_t> cids_point_read; + for (const auto& [_, cids] : *indicator_maps) { Review Comment: warning: variable 'cids_point_read' is not initialized [cppcoreguidelines-init-variables] ```suggestion _set<uint32_t> cids_point_read = 0; ``` ########## be/src/vec/jsonb/serialize.cpp: ########## @@ -109,4 +130,41 @@ void JsonbSerializeUtil::jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const } } +void JsonbSerializeUtil::jsonb_to_block( + const DataTypeSerDeSPtrs& serdes_full_read, const DataTypeSerDeSPtrs& serdes_point_read, + const char* data, size_t size, + const std::unordered_map<uint32_t, uint32_t>& col_uid_to_idx_full_read, + const std::unordered_map<uint32_t, std::pair<uint32_t, uint32_t>>& + col_uid_to_idx_cid_point_read, + std::unordered_map<uint32_t, bool>& point_read_cids, Block& block_full_read, + Block& block_point_read) { + JsonbDocument& doc = *JsonbDocument::createDocument(data, size); + size_t full_read_filled_columns = 0; + for (auto it = doc->begin(); it != doc->end(); ++it) { + auto col_it = col_uid_to_idx_full_read.find(it->getKeyId()); + if (col_it != col_uid_to_idx_full_read.end()) { + MutableColumnPtr dst_column = Review Comment: warning: variable 'dst_column' is not initialized [cppcoreguidelines-init-variables] ```suggestion MutableColumnPtr dst_column = 0 = ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org