This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3dee2e51b92 [fix](cluster key) fix cluster key too many segment after compaction (#44927) 3dee2e51b92 is described below commit 3dee2e51b9221a5529ce051c065084df46f6c91e Author: meiyi <me...@selectdb.com> AuthorDate: Wed Dec 4 11:36:24 2024 +0800 [fix](cluster key) fix cluster key too many segment after compaction (#44927) --- be/src/common/config.cpp | 3 +++ be/src/common/config.h | 3 +++ be/src/olap/rowset/segment_v2/segment_writer.cpp | 31 +++++++++++----------- be/src/olap/rowset/segment_v2/segment_writer.h | 4 +++ be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 6 ++--- ...est_compaction_with_multi_append_columns.groovy | 16 +++++++---- 6 files changed, 40 insertions(+), 23 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 63989a76261..b3e7d0bce5e 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1166,6 +1166,9 @@ DEFINE_mBool(enable_missing_rows_correctness_check, "false"); // When the number of missing versions is more than this value, do not directly // retry the publish and handle it through async publish. DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20"); +// When the size of primary keys in memory exceeds this value, finish current segment +// and create a new segment, used in compaction. Default 50MB. +DEFINE_mInt64(mow_primary_key_index_max_size_in_memory, "52428800"); // When the version is not continuous for MOW table in publish phase and the gap between // current txn's publishing version and the max version of the tablet exceeds this value, // don't print warning log diff --git a/be/src/common/config.h b/be/src/common/config.h index 29e55e64063..59fc61e8cb3 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1236,6 +1236,9 @@ DECLARE_mBool(enable_missing_rows_correctness_check); // When the number of missing versions is more than this value, do not directly // retry the publish and handle it through async publish. DECLARE_mInt32(mow_publish_max_discontinuous_version_num); +// When the size of primary keys in memory exceeds this value, finish current segment +// and create a new segment, used in compaction. +DECLARE_mInt64(mow_primary_key_index_max_size_in_memory); // When the version is not continuous for MOW table in publish phase and the gap between // current txn's publishing version and the max version of the tablet exceeds this value, // don't print warning log diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index fc22c3570e5..c6c9664be4b 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -1016,6 +1016,18 @@ Status SegmentWriter::finalize_columns_index(uint64_t* index_size) { *index_size = _file_writer->bytes_appended() - index_start; if (_has_key) { if (_is_mow_with_cluster_key()) { + // 1. sort primary keys + std::sort(_primary_keys.begin(), _primary_keys.end()); + // 2. write primary keys index + std::string last_key; + for (const auto& key : _primary_keys) { + DCHECK(key.compare(last_key) > 0) + << "found duplicate key or key is not sorted! current key: " << key + << ", last key: " << last_key; + RETURN_IF_ERROR(_primary_key_index_builder->add_item(key)); + last_key = key; + } + RETURN_IF_ERROR(_write_short_key_index()); *index_size = _file_writer->bytes_appended() - index_start; RETURN_IF_ERROR(_write_primary_key_index()); @@ -1236,27 +1248,16 @@ Status SegmentWriter::_generate_primary_key_index( last_key = std::move(key); } } else { // mow table with cluster key - // 1. generate primary keys in memory - std::vector<std::string> primary_keys; + // generate primary keys in memory for (uint32_t pos = 0; pos < num_rows; pos++) { std::string key = _full_encode_keys(primary_key_coders, primary_key_columns, pos); _maybe_invalid_row_cache(key); if (_tablet_schema->has_sequence_col()) { _encode_seq_column(seq_column, pos, &key); } - _encode_rowid(pos, &key); - primary_keys.emplace_back(std::move(key)); - } - // 2. sort primary keys - std::sort(primary_keys.begin(), primary_keys.end()); - // 3. write primary keys index - std::string last_key; - for (const auto& key : primary_keys) { - DCHECK(key.compare(last_key) > 0) - << "found duplicate key or key is not sorted! current key: " << key - << ", last key: " << last_key; - RETURN_IF_ERROR(_primary_key_index_builder->add_item(key)); - last_key = key; + _encode_rowid(pos + _num_rows_written, &key); + _primary_keys_size += key.size(); + _primary_keys.emplace_back(std::move(key)); } } return Status::OK(); diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 9a8af131087..a1b7491a669 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -155,6 +155,8 @@ public: return Status::OK(); } + uint64_t primary_keys_size() const { return _primary_keys_size; } + private: DISALLOW_COPY_AND_ASSIGN(SegmentWriter); Status _create_column_writer(uint32_t cid, const TabletColumn& column, @@ -260,6 +262,8 @@ private: std::map<RowsetId, RowsetSharedPtr> _rsid_to_rowset; // contains auto generated columns, should be nullptr if no variants's subcolumns TabletSchemaSPtr _flush_schema = nullptr; + std::vector<std::string> _primary_keys; + uint64_t _primary_keys_size = 0; }; } // namespace segment_v2 diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp index ee9bfd97745..f493f21ac97 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp @@ -72,10 +72,9 @@ Status VerticalBetaRowsetWriter<T>::add_columns(const vectorized::Block* block, _cur_writer_idx = 0; RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, 0, num_rows)); } else if (is_key) { - // TODO for cluster key, always create new segment writer because the primary keys are - // sorted in SegmentWriter::_generate_primary_key_index, will cause too many segments if (_segment_writers[_cur_writer_idx]->num_rows_written() > max_rows_per_segment || - has_cluster_key) { + (has_cluster_key && _segment_writers[_cur_writer_idx]->primary_keys_size() > + config::mow_primary_key_index_max_size_in_memory)) { // segment is full, need flush columns and create new segment writer RETURN_IF_ERROR(_flush_columns(_segment_writers[_cur_writer_idx].get(), true)); @@ -181,6 +180,7 @@ Status VerticalBetaRowsetWriter<T>::_create_segment_writer( writer_options.enable_unique_key_merge_on_write = context.enable_unique_key_merge_on_write; writer_options.rowset_ctx = &context; writer_options.max_rows_per_segment = context.max_rows_per_segment; + // TODO if support VerticalSegmentWriter, also need to handle cluster key primary key index *writer = std::make_unique<segment_v2::SegmentWriter>( segment_file_writer.get(), seg_id, context.tablet_schema, context.tablet, context.data_dir, writer_options, inverted_index_file_writer.get()); diff --git a/regression-test/suites/unique_with_mow_c_p0/test_compaction_with_multi_append_columns.groovy b/regression-test/suites/unique_with_mow_c_p0/test_compaction_with_multi_append_columns.groovy index 8403b17cce5..acac719b8c5 100644 --- a/regression-test/suites/unique_with_mow_c_p0/test_compaction_with_multi_append_columns.groovy +++ b/regression-test/suites/unique_with_mow_c_p0/test_compaction_with_multi_append_columns.groovy @@ -128,11 +128,17 @@ suite("test_compaction_with_multi_append_columns", "p0") { assertEquals("success", compactJson.status.toLowerCase()) } - (code, out, err) = be_show_tablet_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) - logger.info("Show tablet status: code=" + code + ", out=" + out + ", err=" + err) - assertEquals(code, 0) - def json = parseJson(out.trim()) - logger.info("tablet rowset: " + json) + for (int i = 0; i < 10; i++) { + (code, out, err) = be_show_tablet_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("loop " + i + ", Show tablet status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def json = parseJson(out.trim()) + logger.info("tablet rowsets: " + json) + if (json.rowsets.size() <= 5) { + break + } + sleep(2000) + } } checkNoDuplicatedKeys(tableName) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org