This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 12c4d1f4dd [feature-wip](unique-key-merge-on-write) unique key table with MOW supports sequence column (#11808) 12c4d1f4dd is described below commit 12c4d1f4dd0f1f8dea5bda5f3fb5121ee8063309 Author: Xin Liao <liaoxin...@126.com> AuthorDate: Wed Aug 17 10:56:14 2022 +0800 [feature-wip](unique-key-merge-on-write) unique key table with MOW supports sequence column (#11808) --- be/src/olap/primary_key_index.cpp | 3 +- be/src/olap/primary_key_index.h | 9 +- be/src/olap/rowset/segment_v2/segment.cpp | 45 ++++++- be/src/olap/rowset/segment_v2/segment_writer.cpp | 34 ++++- be/src/olap/tablet.cpp | 24 +++- be/test/olap/primary_key_index_test.cpp | 2 +- be/test/olap/rowset/segment_v2/segment_test.cpp | 144 ++++++++++++++++++++- .../main/java/org/apache/doris/catalog/Column.java | 4 +- .../java/org/apache/doris/catalog/OlapTable.java | 12 +- .../apache/doris/datasource/InternalCatalog.java | 5 - 10 files changed, 258 insertions(+), 24 deletions(-) diff --git a/be/src/olap/primary_key_index.cpp b/be/src/olap/primary_key_index.cpp index 52129675e7..4052cb1613 100644 --- a/be/src/olap/primary_key_index.cpp +++ b/be/src/olap/primary_key_index.cpp @@ -42,7 +42,8 @@ Status PrimaryKeyIndexBuilder::init() { Status PrimaryKeyIndexBuilder::add_item(const Slice& key) { RETURN_IF_ERROR(_primary_key_index_builder->add(&key)); - _bloom_filter_index_builder->add_values(&key, 1); + Slice key_without_seq = Slice(key.get_data(), key.get_size() - _seq_col_length); + _bloom_filter_index_builder->add_values(&key_without_seq, 1); // the key is already sorted, so the first key is min_key, and // the last key is max_key. if (UNLIKELY(_num_rows == 0)) { diff --git a/be/src/olap/primary_key_index.h b/be/src/olap/primary_key_index.h index ac85ff64c8..c2b87aa702 100644 --- a/be/src/olap/primary_key_index.h +++ b/be/src/olap/primary_key_index.h @@ -38,8 +38,8 @@ namespace doris { // NOTE: for now, it's only used when unique key merge-on-write property enabled. class PrimaryKeyIndexBuilder { public: - PrimaryKeyIndexBuilder(io::FileWriter* file_writer) - : _file_writer(file_writer), _num_rows(0), _size(0) {} + PrimaryKeyIndexBuilder(io::FileWriter* file_writer, size_t seq_col_length) + : _file_writer(file_writer), _num_rows(0), _size(0), _seq_col_length(seq_col_length) {} Status init(); @@ -49,8 +49,8 @@ public: uint64_t size() const { return _size; } - Slice min_key() { return Slice(_min_key); } - Slice max_key() { return Slice(_max_key); } + Slice min_key() { return Slice(_min_key.data(), _min_key.size() - _seq_col_length); } + Slice max_key() { return Slice(_max_key.data(), _max_key.size() - _seq_col_length); } Status finalize(segment_v2::PrimaryKeyIndexMetaPB* meta); @@ -58,6 +58,7 @@ private: io::FileWriter* _file_writer = nullptr; uint32_t _num_rows; uint64_t _size; + size_t _seq_col_length; faststring _min_key; faststring _max_key; diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 18c7e2797f..44ebf8de1c 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -245,19 +245,58 @@ Status Segment::new_bitmap_index_iterator(const TabletColumn& tablet_column, Status Segment::lookup_row_key(const Slice& key, RowLocation* row_location) { RETURN_IF_ERROR(load_index()); + bool has_seq_col = _tablet_schema->has_sequence_col(); + size_t seq_col_length = 0; + if (has_seq_col) { + seq_col_length = _tablet_schema->column(_tablet_schema->sequence_col_idx()).length() + 1; + } + Slice key_without_seq = Slice(key.get_data(), key.get_size() - seq_col_length); + DCHECK(_pk_index_reader != nullptr); - if (!_pk_index_reader->check_present(key)) { + if (!_pk_index_reader->check_present(key_without_seq)) { return Status::NotFound("Can't find key in the segment"); } bool exact_match = false; std::unique_ptr<segment_v2::IndexedColumnIterator> index_iterator; RETURN_IF_ERROR(_pk_index_reader->new_iterator(&index_iterator)); - RETURN_IF_ERROR(index_iterator->seek_at_or_after(&key, &exact_match)); - if (!exact_match) { + RETURN_IF_ERROR(index_iterator->seek_at_or_after(&key_without_seq, &exact_match)); + if (!has_seq_col && !exact_match) { return Status::NotFound("Can't find key in the segment"); } row_location->row_id = index_iterator->get_current_ordinal(); row_location->segment_id = _segment_id; + + if (has_seq_col) { + MemPool pool; + size_t num_to_read = 1; + std::unique_ptr<ColumnVectorBatch> cvb; + RETURN_IF_ERROR(ColumnVectorBatch::create(num_to_read, false, _pk_index_reader->type_info(), + nullptr, &cvb)); + ColumnBlock block(cvb.get(), &pool); + ColumnBlockView column_block_view(&block); + size_t num_read = num_to_read; + RETURN_IF_ERROR(index_iterator->next_batch(&num_read, &column_block_view)); + DCHECK(num_to_read == num_read); + + const Slice* sought_key = reinterpret_cast<const Slice*>(cvb->cell_ptr(0)); + Slice sought_key_without_seq = + Slice(sought_key->get_data(), sought_key->get_size() - seq_col_length); + + // compare key + if (key_without_seq.compare(sought_key_without_seq) != 0) { + return Status::NotFound("Can't find key in the segment"); + } + + // compare sequence id + Slice sequence_id = + Slice(key.get_data() + key_without_seq.get_size() + 1, seq_col_length - 1); + Slice previous_sequence_id = Slice( + sought_key->get_data() + sought_key_without_seq.get_size() + 1, seq_col_length - 1); + if (sequence_id.compare(previous_sequence_id) < 0) { + return Status::AlreadyExist("key with higher sequence id exists"); + } + } + return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index d041a7a1a0..1f016e1094 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -62,6 +62,12 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id, _key_coders.push_back(get_key_coder(column.type())); _key_index_size.push_back(column.index_length()); } + // encode the sequence id into the primary key index + if (_tablet_schema->has_sequence_col() && _tablet_schema->keys_type() == UNIQUE_KEYS && + _opts.enable_unique_key_merge_on_write) { + const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx()); + _key_coders.push_back(get_key_coder(column.type())); + } } SegmentWriter::~SegmentWriter() { @@ -116,7 +122,12 @@ Status SegmentWriter::init(uint32_t write_mbytes_per_sec __attribute__((unused)) // we don't need the short key index for unique key merge on write table. if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) { - _primary_key_index_builder.reset(new PrimaryKeyIndexBuilder(_file_writer)); + size_t seq_col_length = 0; + if (_tablet_schema->has_sequence_col()) { + seq_col_length = + _tablet_schema->column(_tablet_schema->sequence_col_idx()).length() + 1; + } + _primary_key_index_builder.reset(new PrimaryKeyIndexBuilder(_file_writer, seq_col_length)); RETURN_IF_ERROR(_primary_key_index_builder->init()); } else { _short_key_index_builder.reset( @@ -152,7 +163,9 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po if (converted_result.first != Status::OK()) { return converted_result.first; } - if (cid < _num_key_columns) { + if (cid < _num_key_columns || + (_tablet_schema->has_sequence_col() && _tablet_schema->keys_type() == UNIQUE_KEYS && + _opts.enable_unique_key_merge_on_write && cid == _tablet_schema->sequence_col_idx())) { key_columns.push_back(converted_result.second); } RETURN_IF_ERROR(_column_writers[cid]->append(converted_result.second->get_nullmap(), @@ -191,8 +204,15 @@ int64_t SegmentWriter::max_row_to_add(size_t row_avg_size_in_bytes) { std::string SegmentWriter::_encode_keys( const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos, bool null_first) { - assert(key_columns.size() == _num_key_columns && _key_coders.size() == _num_key_columns && - _key_index_size.size() == _num_key_columns); + if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write && + _tablet_schema->has_sequence_col()) { + assert(key_columns.size() == _num_key_columns + 1 && + _key_coders.size() == _num_key_columns + 1 && + _key_index_size.size() == _num_key_columns); + } else { + assert(key_columns.size() == _num_key_columns && _key_coders.size() == _num_key_columns && + _key_index_size.size() == _num_key_columns); + } std::string encoded_keys; size_t cid = 0; @@ -227,6 +247,12 @@ Status SegmentWriter::append_row(const RowType& row) { if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) { std::string encoded_key; encode_key<RowType, true, true>(&encoded_key, row, _num_key_columns); + if (_tablet_schema->has_sequence_col()) { + encoded_key.push_back(KEY_NORMAL_MARKER); + auto cid = _tablet_schema->sequence_col_idx(); + auto cell = row.cell(cid); + row.schema()->column(cid)->full_encode_ascending(cell.cell_ptr(), &encoded_key); + } RETURN_IF_ERROR(_primary_key_index_builder->add_item(encoded_key)); } else { // At the beginning of one block, so add a short key index entry diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 9be70259a2..2cf2271c31 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1879,7 +1879,12 @@ TabletSchemaSPtr Tablet::tablet_schema() const { Status Tablet::lookup_row_key(const Slice& encoded_key, const RowsetIdUnorderedSet* rowset_ids, RowLocation* row_location, uint32_t version) { std::vector<std::pair<RowsetSharedPtr, int32_t>> selected_rs; - _rowset_tree->FindRowsetsWithKeyInRange(encoded_key, rowset_ids, &selected_rs); + size_t seq_col_length = 0; + if (_schema->has_sequence_col()) { + seq_col_length = _schema->column(_schema->sequence_col_idx()).length() + 1; + } + Slice key_without_seq = Slice(encoded_key.get_data(), encoded_key.get_size() - seq_col_length); + _rowset_tree->FindRowsetsWithKeyInRange(key_without_seq, rowset_ids, &selected_rs); if (selected_rs.empty()) { return Status::NotFound("No rowsets contains the key in key range"); } @@ -1912,6 +1917,11 @@ Status Tablet::lookup_row_key(const Slice& encoded_key, const RowsetIdUnorderedS loc.rowset_id = rs.first->rowset_id(); if (version >= 0 && _tablet_meta->delete_bitmap().contains_agg( {loc.rowset_id, loc.segment_id, version}, loc.row_id)) { + // if has sequence col, we continue to compare the sequence_id of + // all rowsets, util we find an existing key. + if (_schema->has_sequence_col()) { + continue; + } // The key is deleted, we don't need to search for it any more. break; } @@ -1944,6 +1954,7 @@ Status Tablet::calc_delete_bitmap(RowsetId rowset_id, auto pk_idx = seg->get_primary_key_index(); int cnt = 0; int total = pk_idx->num_rows(); + uint32_t row_id = 0; int32_t remaining = total; bool exact_match = false; std::string last_key; @@ -1984,9 +1995,18 @@ Status Tablet::calc_delete_bitmap(RowsetId rowset_id, } RowLocation loc; auto st = lookup_row_key(*key, specified_rowset_ids, &loc, dummy_version.first - 1); - CHECK(st.ok() || st.is_not_found()); + CHECK(st.ok() || st.is_not_found() || st.is_already_exist()); if (st.is_not_found()) continue; + + // sequece id smaller than the previous one, so delelte current row + if (st.is_already_exist()) { + loc.rowset_id = rowset_id; + loc.segment_id = seg->id(); + loc.row_id = row_id; + } + ++cnt; + ++row_id; delete_bitmap->add({loc.rowset_id, loc.segment_id, dummy_version.first}, loc.row_id); } diff --git a/be/test/olap/primary_key_index_test.cpp b/be/test/olap/primary_key_index_test.cpp index d0bad76ba5..36b4a04b9a 100644 --- a/be/test/olap/primary_key_index_test.cpp +++ b/be/test/olap/primary_key_index_test.cpp @@ -56,7 +56,7 @@ TEST_F(PrimaryKeyIndexTest, builder) { auto fs = io::global_local_filesystem(); EXPECT_TRUE(fs->create_file(filename, &file_writer).ok()); - PrimaryKeyIndexBuilder builder(file_writer.get()); + PrimaryKeyIndexBuilder builder(file_writer.get(), 0); builder.init(); size_t num_rows = 0; std::vector<std::string> keys; diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp b/be/test/olap/rowset/segment_v2/segment_test.cpp index df73e8f8f7..105a9f5c3c 100644 --- a/be/test/olap/rowset/segment_v2/segment_test.cpp +++ b/be/test/olap/rowset/segment_v2/segment_test.cpp @@ -1300,5 +1300,147 @@ TEST_F(SegmentReaderWriterTest, TestBloomFilterIndexUniqueModel) { build_segment(opts2, schema, schema, 100, DefaultIntGenerator, &seg2); EXPECT_TRUE(column_contains_index(seg2->footer().columns(3), BLOOM_FILTER_INDEX)); } + +TEST_F(SegmentReaderWriterTest, TestLookupRowKey) { + TabletSchemaSPtr tablet_schema = create_schema( + {create_int_key(1), create_int_key(2), create_int_value(3), create_int_value(4)}, + UNIQUE_KEYS); + SegmentWriterOptions opts; + opts.enable_unique_key_merge_on_write = true; + opts.num_rows_per_block = 10; + + shared_ptr<Segment> segment; + build_segment(opts, tablet_schema, tablet_schema, 4096, DefaultIntGenerator, &segment); + + // key exist + { + RowCursor row; + auto olap_st = row.init(tablet_schema); + EXPECT_EQ(Status::OK(), olap_st); + for (size_t rid = 0; rid < 4096; ++rid) { + for (int cid = 0; cid < tablet_schema->num_columns(); ++cid) { + int row_block_id = rid / opts.num_rows_per_block; + RowCursorCell cell = row.cell(cid); + DefaultIntGenerator(rid, cid, row_block_id, cell); + } + std::string encoded_key; + encode_key<RowCursor, true, true>(&encoded_key, row, tablet_schema->num_key_columns()); + RowLocation row_location; + Status st = segment->lookup_row_key(encoded_key, &row_location); + EXPECT_EQ(row_location.row_id, rid); + EXPECT_EQ(st, Status::OK()); + } + } + + // key not exist + { + RowCursor row; + auto olap_st = row.init(tablet_schema); + EXPECT_EQ(Status::OK(), olap_st); + for (size_t rid = 4096; rid < 4100; ++rid) { + for (int cid = 0; cid < tablet_schema->num_columns(); ++cid) { + int row_block_id = rid / opts.num_rows_per_block; + RowCursorCell cell = row.cell(cid); + DefaultIntGenerator(rid, cid, row_block_id, cell); + } + std::string encoded_key; + encode_key<RowCursor, true, true>(&encoded_key, row, tablet_schema->num_key_columns()); + RowLocation row_location; + Status st = segment->lookup_row_key(encoded_key, &row_location); + EXPECT_EQ(st.is_not_found(), true); + } + } +} + +TEST_F(SegmentReaderWriterTest, TestLookupRowKeyWithSequenceCol) { + TabletSchemaSPtr tablet_schema = create_schema( + {create_int_key(1), create_int_key(2), create_int_value(3), create_int_value(4)}, + UNIQUE_KEYS); + tablet_schema->_sequence_col_idx = 3; + SegmentWriterOptions opts; + opts.enable_unique_key_merge_on_write = true; + opts.num_rows_per_block = 10; + + shared_ptr<Segment> segment; + build_segment(opts, tablet_schema, tablet_schema, 4096, DefaultIntGenerator, &segment); + + // key exist + { + RowCursor row; + auto olap_st = row.init(tablet_schema); + EXPECT_EQ(Status::OK(), olap_st); + for (size_t rid = 0; rid < 4096; ++rid) { + for (int cid = 0; cid < tablet_schema->num_columns(); ++cid) { + int row_block_id = rid / opts.num_rows_per_block; + RowCursorCell cell = row.cell(cid); + DefaultIntGenerator(rid, cid, row_block_id, cell); + } + std::string encoded_key; + encode_key<RowCursor, true, true>(&encoded_key, row, tablet_schema->num_key_columns()); + encoded_key.push_back(KEY_NORMAL_MARKER); + auto cid = tablet_schema->sequence_col_idx(); + auto cell = row.cell(cid); + row.schema()->column(cid)->full_encode_ascending(cell.cell_ptr(), &encoded_key); + + RowLocation row_location; + Status st = segment->lookup_row_key(encoded_key, &row_location); + EXPECT_EQ(row_location.row_id, rid); + EXPECT_EQ(st, Status::OK()); + } + } + + // key not exist + { + RowCursor row; + auto olap_st = row.init(tablet_schema); + EXPECT_EQ(Status::OK(), olap_st); + for (size_t rid = 4096; rid < 4100; ++rid) { + for (int cid = 0; cid < tablet_schema->num_columns(); ++cid) { + int row_block_id = rid / opts.num_rows_per_block; + RowCursorCell cell = row.cell(cid); + DefaultIntGenerator(rid, cid, row_block_id, cell); + } + std::string encoded_key; + encode_key<RowCursor, true, true>(&encoded_key, row, tablet_schema->num_key_columns()); + + encoded_key.push_back(KEY_NORMAL_MARKER); + auto cid = tablet_schema->sequence_col_idx(); + auto cell = row.cell(cid); + row.schema()->column(cid)->full_encode_ascending(cell.cell_ptr(), &encoded_key); + + RowLocation row_location; + Status st = segment->lookup_row_key(encoded_key, &row_location); + EXPECT_EQ(st.is_not_found(), true); + } + } + + // key exist, sequence id is smaller + { + RowCursor row; + auto olap_st = row.init(tablet_schema); + EXPECT_EQ(Status::OK(), olap_st); + for (int cid = 0; cid < tablet_schema->num_columns(); ++cid) { + RowCursorCell cell = row.cell(cid); + cell.set_not_null(); + if (cid == tablet_schema->sequence_col_idx()) { + *(int*)cell.mutable_cell_ptr() = 100 + cid - 3; + } else { + *(int*)cell.mutable_cell_ptr() = 100 + cid; + } + } + std::string encoded_key; + encode_key<RowCursor, true, true>(&encoded_key, row, tablet_schema->num_key_columns()); + + encoded_key.push_back(KEY_NORMAL_MARKER); + auto cid = tablet_schema->sequence_col_idx(); + auto cell = row.cell(cid); + row.schema()->column(cid)->full_encode_ascending(cell.cell_ptr(), &encoded_key); + + RowLocation row_location; + Status st = segment->lookup_row_key(encoded_key, &row_location); + EXPECT_EQ(st.is_already_exist(), true); + } +} + } // namespace segment_v2 -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java index 632a7b36ab..669f2bfe6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java @@ -245,7 +245,9 @@ public class Column implements Writable { } public boolean isSequenceColumn() { - return !visible && aggregationType == AggregateType.REPLACE && nameEquals(SEQUENCE_COL, true); + // aggregationType is NONE for unique table with merge on write. + return !visible && (aggregationType == AggregateType.REPLACE + || aggregationType == AggregateType.NONE) && nameEquals(SEQUENCE_COL, true); } public PrimitiveType getDataType() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 19466c5dc8..8143cbb359 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -885,8 +885,16 @@ public class OlapTable extends Table { this.hasSequenceCol = true; this.sequenceType = type; - // sequence column is value column with REPLACE aggregate type - Column sequenceCol = ColumnDef.newSequenceColumnDef(type, AggregateType.REPLACE).toColumn(); + Column sequenceCol; + if (getEnableUniqueKeyMergeOnWrite()) { + // sequence column is value column with NONE aggregate type for + // unique key table with merge on write + sequenceCol = ColumnDef.newSequenceColumnDef(type, AggregateType.NONE).toColumn(); + } else { + // sequence column is value column with REPLACE aggregate type for + // unique key table + sequenceCol = ColumnDef.newSequenceColumnDef(type, AggregateType.REPLACE).toColumn(); + } // add sequence column at last fullSchema.add(sequenceCol); nameToColumn.put(Column.SEQUENCE_COL, sequenceCol); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 0686f15d9f..433fcd2054 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1903,11 +1903,6 @@ public class InternalCatalog implements CatalogIf<Database> { try { sequenceColType = PropertyAnalyzer.analyzeSequenceType(properties, olapTable.getKeysType()); if (sequenceColType != null) { - // TODO(zhannngchen) will support sequence column later. - if (olapTable.getEnableUniqueKeyMergeOnWrite()) { - throw new AnalysisException("Unique key table with MoW(merge on write) not support " - + "sequence column for now"); - } olapTable.setSequenceInfo(sequenceColType); } } catch (Exception e) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org