This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push: new aafce71250 [fix](merge-on-write) fix that the query result has duplicate keys (#16336) aafce71250 is described below commit aafce71250cdd20f008655848ad19c30c20d6703 Author: Xin Liao <liaoxin...@126.com> AuthorDate: Mon Feb 6 17:09:53 2023 +0800 [fix](merge-on-write) fix that the query result has duplicate keys (#16336) * [fix](merge-on-write) fix that the query result has duplicate keys * add ut --- be/src/olap/compaction.cpp | 4 ++++ be/src/olap/merger.cpp | 4 ++++ be/src/olap/rowset/beta_rowset_writer.cpp | 5 ++-- be/src/olap/rowset/beta_rowset_writer.h | 3 ++- be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 4 ++++ be/test/olap/rowid_conversion_test.cpp | 27 +++++++++++++++------- 6 files changed, 35 insertions(+), 12 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 3856c6f85d..c7029e9722 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -250,6 +250,10 @@ bool Compaction::handle_ordered_data_compaction() { if (!config::enable_ordered_data_compaction) { return false; } + if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && + _tablet->enable_unique_key_merge_on_write()) { + return false; + } // check delete version: if compaction type is base compaction and // has a delete version, use original compaction if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index ac75675496..58d9df0158 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -267,6 +267,10 @@ Status Merger::vertical_compact_one_group( } reader_params.tablet_schema = merge_tablet_schema; + if (tablet->enable_unique_key_merge_on_write()) { + reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap(); + } + reader_params.return_columns = column_group; reader_params.origin_return_columns = &reader_params.return_columns; RETURN_NOT_OK(reader.init(reader_params)); diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index e64d4f97ee..081df80633 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -830,13 +830,11 @@ void BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_met int64_t num_rows_written = 0; int64_t total_data_size = 0; int64_t total_index_size = 0; - std::vector<uint32_t> segment_num_rows; std::vector<KeyBoundsPB> segments_encoded_key_bounds; { std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); for (const auto& itr : _segid_statistics_map) { num_rows_written += itr.second.row_num; - segment_num_rows.push_back(itr.second.row_num); total_data_size += itr.second.data_size; total_index_size += itr.second.index_size; segments_encoded_key_bounds.push_back(itr.second.key_bounds); @@ -851,7 +849,6 @@ void BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_met } rowset_meta->set_num_segments(num_seg); - _segment_num_rows = segment_num_rows; // TODO(zhangzhengyu): key_bounds.size() should equal num_seg, but currently not always rowset_meta->set_num_rows(num_rows_written + _num_rows_written); rowset_meta->set_total_disk_size(total_data_size + _total_data_size); @@ -992,6 +989,8 @@ Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::Segme std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true); _segid_statistics_map.emplace(segid, segstat); + _segment_num_rows.resize(_num_segment); + _segment_num_rows[_num_segment - 1] = row_num; } VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " row_num:" << row_num << " data_size:" << segment_size << " index_size:" << index_size; diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 754491bc82..6f32b24f08 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -143,7 +143,8 @@ protected: std::unique_ptr<segment_v2::SegmentWriter> _segment_writer; mutable SpinLock _lock; // protect following vectors. - // record rows number of every segment + // record rows number of every segment already written, using for rowid + // conversion when compaction in unique key with MoW model std::vector<uint32_t> _segment_num_rows; std::vector<io::FileWriterPtr> _file_writers; // for unique key table with merge-on-write diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp index edcd3f13a3..3c292edefe 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp @@ -64,6 +64,10 @@ Status VerticalBetaRowsetWriter::add_columns(const vectorized::Block* block, if (_segment_writers[_cur_writer_idx]->num_rows_written() > max_rows_per_segment) { // segment is full, need flush columns and create new segment writer RETURN_IF_ERROR(_flush_columns(&_segment_writers[_cur_writer_idx], true)); + + _segment_num_rows.resize(_cur_writer_idx + 1); + _segment_num_rows[_cur_writer_idx] = _segment_writers[_cur_writer_idx]->row_count(); + std::unique_ptr<segment_v2::SegmentWriter> writer; RETURN_IF_ERROR(_create_segment_writer(col_ids, is_key, &writer)); _segment_writers.emplace_back(std::move(writer)); diff --git a/be/test/olap/rowid_conversion_test.cpp b/be/test/olap/rowid_conversion_test.cpp index a08f17f4c4..2de735dd5d 100644 --- a/be/test/olap/rowid_conversion_test.cpp +++ b/be/test/olap/rowid_conversion_test.cpp @@ -355,14 +355,6 @@ protected: input_rowsets.push_back(rowset); } - // create input rowset reader - vector<RowsetReaderSharedPtr> input_rs_readers; - for (auto& rowset : input_rowsets) { - RowsetReaderSharedPtr rs_reader; - EXPECT_TRUE(rowset->create_reader(&rs_reader).ok()); - input_rs_readers.push_back(std::move(rs_reader)); - } - // create output rowset writer RowsetWriterContext writer_context; create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456, &writer_context); @@ -374,6 +366,19 @@ protected: TabletSharedPtr tablet = create_tablet(*tablet_schema, enable_unique_key_merge_on_write, output_rs_writer->version().first - 1, has_delete_handler); + if (enable_unique_key_merge_on_write) { + tablet->tablet_meta()->delete_bitmap().add({input_rowsets[0]->rowset_id(), 0, 0}, 0); + tablet->tablet_meta()->delete_bitmap().add({input_rowsets[0]->rowset_id(), 0, 0}, 3); + } + + // create input rowset reader + vector<RowsetReaderSharedPtr> input_rs_readers; + for (auto& rowset : input_rowsets) { + RowsetReaderSharedPtr rs_reader; + EXPECT_TRUE(rowset->create_reader(&rs_reader).ok()); + input_rs_readers.push_back(std::move(rs_reader)); + } + Merger::Statistics stats; RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; @@ -424,10 +429,16 @@ protected: RowLocation src(input_rowsets[rs_id]->rowset_id(), s_id, row_id); RowLocation dst; int res = rowid_conversion.get(src, &dst); + // key deleted by delete bitmap + if (enable_unique_key_merge_on_write && rs_id == 0 && s_id == 0 && + (row_id == 0 || row_id == 3)) { + EXPECT_LT(res, 0); + } if (res < 0) { continue; } size_t rowid_in_output_data = dst.row_id; + EXPECT_GT(segment_num_rows[dst.segment_id], dst.row_id); for (auto n = 1; n <= dst.segment_id; n++) { rowid_in_output_data += segment_num_rows[n - 1]; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org