This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3dee2e51b92 [fix](cluster key) fix cluster key too many segment after 
compaction (#44927)
3dee2e51b92 is described below

commit 3dee2e51b9221a5529ce051c065084df46f6c91e
Author: meiyi <me...@selectdb.com>
AuthorDate: Wed Dec 4 11:36:24 2024 +0800

    [fix](cluster key) fix cluster key too many segment after compaction 
(#44927)
---
 be/src/common/config.cpp                           |  3 +++
 be/src/common/config.h                             |  3 +++
 be/src/olap/rowset/segment_v2/segment_writer.cpp   | 31 +++++++++++-----------
 be/src/olap/rowset/segment_v2/segment_writer.h     |  4 +++
 be/src/olap/rowset/vertical_beta_rowset_writer.cpp |  6 ++---
 ...est_compaction_with_multi_append_columns.groovy | 16 +++++++----
 6 files changed, 40 insertions(+), 23 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 63989a76261..b3e7d0bce5e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1166,6 +1166,9 @@ DEFINE_mBool(enable_missing_rows_correctness_check, 
"false");
 // When the number of missing versions is more than this value, do not directly
 // retry the publish and handle it through async publish.
 DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20");
+// When the size of primary keys in memory exceeds this value, finish current 
segment
+// and create a new segment, used in compaction. Default 50MB.
+DEFINE_mInt64(mow_primary_key_index_max_size_in_memory, "52428800");
 // When the version is not continuous for MOW table in publish phase and the 
gap between
 // current txn's publishing version and the max version of the tablet exceeds 
this value,
 // don't print warning log
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 29e55e64063..59fc61e8cb3 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1236,6 +1236,9 @@ DECLARE_mBool(enable_missing_rows_correctness_check);
 // When the number of missing versions is more than this value, do not directly
 // retry the publish and handle it through async publish.
 DECLARE_mInt32(mow_publish_max_discontinuous_version_num);
+// When the size of primary keys in memory exceeds this value, finish current 
segment
+// and create a new segment, used in compaction.
+DECLARE_mInt64(mow_primary_key_index_max_size_in_memory);
 // When the version is not continuous for MOW table in publish phase and the 
gap between
 // current txn's publishing version and the max version of the tablet exceeds 
this value,
 // don't print warning log
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index fc22c3570e5..c6c9664be4b 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -1016,6 +1016,18 @@ Status SegmentWriter::finalize_columns_index(uint64_t* 
index_size) {
     *index_size = _file_writer->bytes_appended() - index_start;
     if (_has_key) {
         if (_is_mow_with_cluster_key()) {
+            // 1. sort primary keys
+            std::sort(_primary_keys.begin(), _primary_keys.end());
+            // 2. write primary keys index
+            std::string last_key;
+            for (const auto& key : _primary_keys) {
+                DCHECK(key.compare(last_key) > 0)
+                        << "found duplicate key or key is not sorted! current 
key: " << key
+                        << ", last key: " << last_key;
+                RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
+                last_key = key;
+            }
+
             RETURN_IF_ERROR(_write_short_key_index());
             *index_size = _file_writer->bytes_appended() - index_start;
             RETURN_IF_ERROR(_write_primary_key_index());
@@ -1236,27 +1248,16 @@ Status SegmentWriter::_generate_primary_key_index(
             last_key = std::move(key);
         }
     } else { // mow table with cluster key
-        // 1. generate primary keys in memory
-        std::vector<std::string> primary_keys;
+        // generate primary keys in memory
         for (uint32_t pos = 0; pos < num_rows; pos++) {
             std::string key = _full_encode_keys(primary_key_coders, 
primary_key_columns, pos);
             _maybe_invalid_row_cache(key);
             if (_tablet_schema->has_sequence_col()) {
                 _encode_seq_column(seq_column, pos, &key);
             }
-            _encode_rowid(pos, &key);
-            primary_keys.emplace_back(std::move(key));
-        }
-        // 2. sort primary keys
-        std::sort(primary_keys.begin(), primary_keys.end());
-        // 3. write primary keys index
-        std::string last_key;
-        for (const auto& key : primary_keys) {
-            DCHECK(key.compare(last_key) > 0)
-                    << "found duplicate key or key is not sorted! current key: 
" << key
-                    << ", last key: " << last_key;
-            RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
-            last_key = key;
+            _encode_rowid(pos + _num_rows_written, &key);
+            _primary_keys_size += key.size();
+            _primary_keys.emplace_back(std::move(key));
         }
     }
     return Status::OK();
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h 
b/be/src/olap/rowset/segment_v2/segment_writer.h
index 9a8af131087..a1b7491a669 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -155,6 +155,8 @@ public:
         return Status::OK();
     }
 
+    uint64_t primary_keys_size() const { return _primary_keys_size; }
+
 private:
     DISALLOW_COPY_AND_ASSIGN(SegmentWriter);
     Status _create_column_writer(uint32_t cid, const TabletColumn& column,
@@ -260,6 +262,8 @@ private:
     std::map<RowsetId, RowsetSharedPtr> _rsid_to_rowset;
     // contains auto generated columns, should be nullptr if no variants's 
subcolumns
     TabletSchemaSPtr _flush_schema = nullptr;
+    std::vector<std::string> _primary_keys;
+    uint64_t _primary_keys_size = 0;
 };
 
 } // namespace segment_v2
diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp 
b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
index ee9bfd97745..f493f21ac97 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
@@ -72,10 +72,9 @@ Status VerticalBetaRowsetWriter<T>::add_columns(const 
vectorized::Block* block,
         _cur_writer_idx = 0;
         RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, 
0, num_rows));
     } else if (is_key) {
-        // TODO for cluster key, always create new segment writer because the 
primary keys are
-        // sorted in SegmentWriter::_generate_primary_key_index, will cause 
too many segments
         if (_segment_writers[_cur_writer_idx]->num_rows_written() > 
max_rows_per_segment ||
-            has_cluster_key) {
+            (has_cluster_key && 
_segment_writers[_cur_writer_idx]->primary_keys_size() >
+                                        
config::mow_primary_key_index_max_size_in_memory)) {
             // segment is full, need flush columns and create new segment 
writer
             
RETURN_IF_ERROR(_flush_columns(_segment_writers[_cur_writer_idx].get(), true));
 
@@ -181,6 +180,7 @@ Status VerticalBetaRowsetWriter<T>::_create_segment_writer(
     writer_options.enable_unique_key_merge_on_write = 
context.enable_unique_key_merge_on_write;
     writer_options.rowset_ctx = &context;
     writer_options.max_rows_per_segment = context.max_rows_per_segment;
+    // TODO if support VerticalSegmentWriter, also need to handle cluster key 
primary key index
     *writer = std::make_unique<segment_v2::SegmentWriter>(
             segment_file_writer.get(), seg_id, context.tablet_schema, 
context.tablet,
             context.data_dir, writer_options, 
inverted_index_file_writer.get());
diff --git 
a/regression-test/suites/unique_with_mow_c_p0/test_compaction_with_multi_append_columns.groovy
 
b/regression-test/suites/unique_with_mow_c_p0/test_compaction_with_multi_append_columns.groovy
index 8403b17cce5..acac719b8c5 100644
--- 
a/regression-test/suites/unique_with_mow_c_p0/test_compaction_with_multi_append_columns.groovy
+++ 
b/regression-test/suites/unique_with_mow_c_p0/test_compaction_with_multi_append_columns.groovy
@@ -128,11 +128,17 @@ suite("test_compaction_with_multi_append_columns", "p0") {
             assertEquals("success", compactJson.status.toLowerCase())
         }
 
-        (code, out, err) = 
be_show_tablet_status(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
-        logger.info("Show tablet status: code=" + code + ", out=" + out + ", 
err=" + err)
-        assertEquals(code, 0)
-        def json = parseJson(out.trim())
-        logger.info("tablet rowset: " + json)
+        for (int i = 0; i < 10; i++) {
+            (code, out, err) = 
be_show_tablet_status(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+            logger.info("loop " + i + ", Show tablet status: code=" + code + 
", out=" + out + ", err=" + err)
+            assertEquals(code, 0)
+            def json = parseJson(out.trim())
+            logger.info("tablet rowsets: " + json)
+            if (json.rowsets.size() <= 5) {
+                break
+            }
+            sleep(2000)
+        }
     }
     checkNoDuplicatedKeys(tableName)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to