github-actions[bot] commented on code in PR #41839:
URL: https://github.com/apache/doris/pull/41839#discussion_r1800402550


##########
be/src/olap/tablet.cpp:
##########
@@ -3349,74 +3395,145 @@ Status Tablet::generate_new_block_for_partial_update(
     return Status::OK();
 }
 
-// read columns by read plan
-// read_index: ori_pos-> block_idx
-Status Tablet::read_columns_by_plan(TabletSchemaSPtr tablet_schema,
-                                    const std::vector<uint32_t> cids_to_read,
-                                    const PartialUpdateReadPlan& read_plan,
-                                    const std::map<RowsetId, RowsetSharedPtr>& 
rsid_to_rowset,
-                                    vectorized::Block& block,
-                                    std::map<uint32_t, uint32_t>* read_index,
-                                    const signed char* __restrict skip_map) {
-    bool has_row_column = tablet_schema->store_row_column();
-    auto mutable_columns = block.mutate_columns();
-    size_t read_idx = 0;
-    for (auto rs_it : read_plan) {
-        for (auto seg_it : rs_it.second) {
-            auto rowset_iter = rsid_to_rowset.find(rs_it.first);
-            CHECK(rowset_iter != rsid_to_rowset.end());
-            std::vector<uint32_t> rids;
-            for (auto [rid, pos] : seg_it.second) {
-                if (skip_map && skip_map[pos]) {
-                    continue;
-                }
-                rids.emplace_back(rid);
-                (*read_index)[pos] = read_idx++;
+Status Tablet::generate_new_block_for_flexible_partial_update(

Review Comment:
   warning: function 'generate_new_block_for_flexible_partial_update' exceeds 
recommended size/complexity thresholds [readability-function-size]
   ```cpp
   ();
                                                  ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/olap/tablet.cpp:3397:** 134 lines including whitespace and comments 
(threshold 80)
   ```cpp
   ();
                                                  ^
   ```
   
   </details>
   



##########
be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:
##########
@@ -562,145 +540,403 @@
     return Status::OK();
 }
 
-Status VerticalSegmentWriter::_fill_missing_columns(
-        vectorized::MutableColumns& mutable_full_columns,
-        const std::vector<bool>& use_default_or_null_flag, bool 
has_default_or_nullable,
-        const size_t& segment_start_pos, const vectorized::Block* block) {
-    if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) {
-        // TODO(plat1ko): CloudStorageEngine
-        return Status::NotSupported("fill_missing_columns");
-    }
+Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(

Review Comment:
   warning: function '_append_block_with_flexible_partial_content' exceeds 
recommended size/complexity thresholds [readability-function-size]
   ```cpp
   Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(
                                 ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:542:** 187 lines 
including whitespace and comments (threshold 80)
   ```cpp
   Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(
                                 ^
   ```
   
   </details>
   



##########
be/src/exec/tablet_info.h:
##########
@@ -20,6 +20,7 @@
 #include <butil/fast_rand.h>

Review Comment:
   warning: 'butil/fast_rand.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <butil/fast_rand.h>
            ^
   ```
   



##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -616,142 +587,345 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     _olap_data_convertor->clear_source_content();
     return Status::OK();
 }
+Status SegmentWriter::append_block_with_flexible_partial_content(const 
vectorized::Block* block,

Review Comment:
   warning: function 'append_block_with_flexible_partial_content' has cognitive 
complexity of 65 (threshold 50) [readability-function-cognitive-complexity]
   ```cpp
   Status SegmentWriter::append_block_with_flexible_partial_content(const 
vectorized::Block* block,
                         ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:606:** nesting level 
increased to 1
   ```cpp
       auto get_skip_bitmaps = [&skip_bitmap_col_idx](const vectorized::Block* 
block) {
                               ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:619:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       
DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_flexible_partial_content.sleep",
       ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:619:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
       
DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_flexible_partial_content.sleep",
       ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:628:** nesting level 
increased to 1
   ```cpp
               [&full_block, &block, &row_pos, &num_rows,
               ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:631:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           for (std::size_t cid {0}; cid < _num_key_columns; cid++) {
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:633:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
               ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:633:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
               ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:636:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (!status.ok()) {
               ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:644:** nesting level 
increased to 1
   ```cpp
       auto encode_seq_column = [&block, &row_pos, &num_rows,
                                ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:647:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (_tablet_schema->has_sequence_col()) {
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:649:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
               ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:649:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
               ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:652:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (!status.ok()) {
               ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:663:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       RETURN_IF_ERROR(encode_key_columns(key_columns));
       ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:663:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
       RETURN_IF_ERROR(encode_key_columns(key_columns));
       ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:670:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       RETURN_IF_ERROR(encode_seq_column(seq_column));
       ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:670:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
       RETURN_IF_ERROR(encode_seq_column(seq_column));
       ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:675:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (_tablet_schema->has_sequence_col()) {
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:677:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(merge_rows_for_sequence_column(block, row_pos, 
num_rows, skip_bitmaps,
           ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:677:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(merge_rows_for_sequence_column(block, row_pos, 
num_rows, skip_bitmaps,
           ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:680:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (origin_rows != num_rows) {
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:683:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(encode_key_columns(key_columns));
               ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:683:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(encode_key_columns(key_columns));
               ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:684:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(encode_seq_column(seq_column));
               ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:684:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(encode_seq_column(seq_column));
               ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:694:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       for (std::size_t cid {0}; cid < _num_key_columns; cid++) {
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:697:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(
           ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:697:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(
           ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:705:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       RETURN_IF_ERROR(generate_flexible_read_plan(
       ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:705:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
       RETURN_IF_ERROR(generate_flexible_read_plan(
       ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:712:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (config::enable_merge_on_write_correctness_check) {
       ^
   ```
   
   </details>
   



##########
be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:
##########
@@ -562,145 +540,403 @@ Status 
VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
     return Status::OK();
 }
 
-Status VerticalSegmentWriter::_fill_missing_columns(
-        vectorized::MutableColumns& mutable_full_columns,
-        const std::vector<bool>& use_default_or_null_flag, bool 
has_default_or_nullable,
-        const size_t& segment_start_pos, const vectorized::Block* block) {
-    if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) {
-        // TODO(plat1ko): CloudStorageEngine
-        return Status::NotSupported("fill_missing_columns");
-    }
+Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(

Review Comment:
   warning: function '_append_block_with_flexible_partial_content' has 
cognitive complexity of 77 (threshold 50) 
[readability-function-cognitive-complexity]
   ```cpp
   Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(
                                 ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:561:** nesting 
level increased to 1
   ```cpp
       auto get_skip_bitmaps = [&skip_bitmap_col_idx](const vectorized::Block* 
block) {
                               ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:575:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       
DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_flexible_partial_content.sleep",
       ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:575:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
       
DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_flexible_partial_content.sleep",
       ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:584:** nesting 
level increased to 1
   ```cpp
               [&full_block, &data,
               ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:587:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           for (std::size_t cid {0}; cid < _num_key_columns; cid++) {
           ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:589:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
               ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:589:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
               
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
               ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:592:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (!status.ok()) {
               ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:600:** nesting 
level increased to 1
   ```cpp
       auto encode_seq_column = [&data, &schema_has_sequence_col,
                                ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:603:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (schema_has_sequence_col) {
           ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:605:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
               ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:605:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
               
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
               ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:609:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (!status.ok()) {
               ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:620:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       RETURN_IF_ERROR(encode_key_columns(key_columns));
       ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:620:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
       RETURN_IF_ERROR(encode_key_columns(key_columns));
       ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:627:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       RETURN_IF_ERROR(encode_seq_column(seq_column));
       ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:627:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
       RETURN_IF_ERROR(encode_seq_column(seq_column));
       ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:632:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (schema_has_sequence_col) {
       ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:634:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(_merge_rows_for_sequence_column(data, skip_bitmaps, 
key_columns, seq_column,
           ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:634:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(_merge_rows_for_sequence_column(data, skip_bitmaps, 
key_columns, seq_column,
           ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:636:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (origin_rows != data.num_rows) {
           ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:639:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(encode_key_columns(key_columns));
               ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:639:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(encode_key_columns(key_columns));
               ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:640:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(encode_seq_column(seq_column));
               ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:640:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(encode_seq_column(seq_column));
               ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:650:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       for (std::size_t cid {0}; cid < _num_key_columns; cid++) {
       ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:653:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), 
column->get_data(),
           ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:653:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), 
column->get_data(),
           ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:661:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       RETURN_IF_ERROR(_generate_flexible_read_plan(
       ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:661:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
       RETURN_IF_ERROR(_generate_flexible_read_plan(
       ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:667:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (config::enable_merge_on_write_correctness_check) {
       ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:673:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       RETURN_IF_ERROR(read_plan.fill_non_primary_key_columns(
       ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:673:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
       RETURN_IF_ERROR(read_plan.fill_non_primary_key_columns(
       ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:707:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (_num_rows_written != data.row_pos ||
       ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:716:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       for (size_t block_pos = data.row_pos; block_pos < data.row_pos + 
data.num_rows; block_pos++) {
       ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:718:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (_tablet_schema->has_sequence_col()) {
           ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:721:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
           ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:721:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
           ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   
   </details>
   



##########
be/src/olap/partial_update_info.h:
##########
@@ -16,29 +16,71 @@
 // under the License.
 
 #pragma once
+#include <gen_cpp/olap_file.pb.h>

Review Comment:
   warning: 'gen_cpp/olap_file.pb.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <gen_cpp/olap_file.pb.h>
            ^
   ```
   



##########
be/src/olap/partial_update_info.cpp:
##########
@@ -176,4 +263,394 @@ void 
PartialUpdateInfo::_generate_default_values_for_missing_cids(
     }
     CHECK_EQ(missing_cids.size(), default_values.size());
 }
+
+void FixedReadPlan::prepare_to_read(const RowLocation& row_location, size_t 
pos) {
+    
plan[row_location.rowset_id][row_location.segment_id].emplace_back(row_location.row_id,
 pos);
+}
+
+// read columns by read plan
+// read_index: ori_pos-> block_idx
+Status FixedReadPlan::read_columns_by_plan(
+        const TabletSchema& tablet_schema, const std::vector<uint32_t> 
cids_to_read,
+        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, 
vectorized::Block& block,
+        std::map<uint32_t, uint32_t>* read_index,
+        const signed char* __restrict delete_signs) const {
+    bool has_row_column = tablet_schema.store_row_column();
+    auto mutable_columns = block.mutate_columns();
+    size_t read_idx = 0;
+    for (const auto& [rowset_id, segment_row_mappings] : plan) {
+        for (const auto& [segment_id, mappings] : segment_row_mappings) {
+            auto rowset_iter = rsid_to_rowset.find(rowset_id);
+            CHECK(rowset_iter != rsid_to_rowset.end());
+            std::vector<uint32_t> rids;
+            for (auto [rid, pos] : mappings) {
+                if (delete_signs && delete_signs[pos]) {
+                    continue;
+                }
+                rids.emplace_back(rid);
+                (*read_index)[pos] = read_idx++;
+            }
+            if (has_row_column) {
+                auto st = doris::Tablet::fetch_value_through_row_column(
+                        rowset_iter->second, tablet_schema, segment_id, rids, 
cids_to_read, block);
+                if (!st.ok()) {
+                    LOG(WARNING) << "failed to fetch value through row column";
+                    return st;
+                }
+                continue;
+            }
+            for (size_t cid = 0; cid < mutable_columns.size(); ++cid) {
+                TabletColumn tablet_column = 
tablet_schema.column(cids_to_read[cid]);
+                auto st = Tablet::fetch_value_by_rowids(rowset_iter->second, 
segment_id, rids,
+                                                        tablet_column, 
mutable_columns[cid]);
+                // set read value to output block
+                if (!st.ok()) {
+                    LOG(WARNING) << "failed to fetch value";
+                    return st;
+                }
+            }
+        }
+    }
+    block.set_columns(std::move(mutable_columns));
+    return Status::OK();
+}
+
+Status FixedReadPlan::fill_missing_columns(
+        RowsetWriterContext* rowset_ctx, const std::map<RowsetId, 
RowsetSharedPtr>& rsid_to_rowset,
+        const TabletSchema& tablet_schema, vectorized::Block& full_block,
+        const std::vector<bool>& use_default_or_null_flag, bool 
has_default_or_nullable,
+        const size_t& segment_start_pos, const vectorized::Block* block) const 
{
+    auto mutable_full_columns = full_block.mutate_columns();
+    // create old value columns
+    const auto& missing_cids = rowset_ctx->partial_update_info->missing_cids;
+    auto old_value_block = tablet_schema.create_block_by_cids(missing_cids);
+    CHECK_EQ(missing_cids.size(), old_value_block.columns());
+
+    // segment pos to write -> rowid to read in old_value_block
+    std::map<uint32_t, uint32_t> read_index;
+    RETURN_IF_ERROR(read_columns_by_plan(tablet_schema, missing_cids, 
rsid_to_rowset,
+                                         old_value_block, &read_index, 
nullptr));
+
+    const auto* delete_sign_column_data = 
Tablet::get_delete_sign_column_data(old_value_block);
+
+    // build default value columns
+    auto default_value_block = old_value_block.clone_empty();
+    if (has_default_or_nullable || delete_sign_column_data != nullptr) {
+        RETURN_IF_ERROR(Tablet::generate_default_value_block(
+                tablet_schema, missing_cids, 
rowset_ctx->partial_update_info->default_values,
+                old_value_block, default_value_block));
+    }
+    auto mutable_default_value_columns = default_value_block.mutate_columns();
+
+    // fill all missing value from mutable_old_columns, need to consider 
default value and null value
+    for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
+        // `use_default_or_null_flag[idx] == false` doesn't mean that we 
should read values from the old row
+        // for the missing columns. For example, if a table has sequence 
column, the rows with DELETE_SIGN column
+        // marked will not be marked in delete bitmap(see 
https://github.com/apache/doris/pull/24011), so it will
+        // be found in Tablet::lookup_row_key() and 
`use_default_or_null_flag[idx]` will be false. But we should not
+        // read values from old rows for missing values in this occasion. So 
we should read the DELETE_SIGN column
+        // to check if a row REALLY exists in the table.
+        auto segment_pos = idx + segment_start_pos;
+        auto pos_in_old_block = read_index[segment_pos];
+        if (use_default_or_null_flag[idx] || (delete_sign_column_data != 
nullptr &&
+                                              
delete_sign_column_data[pos_in_old_block] != 0)) {
+            for (auto i = 0; i < missing_cids.size(); ++i) {
+                // if the column has default value, fill it with default value
+                // otherwise, if the column is nullable, fill it with null 
value
+                const auto& tablet_column = 
tablet_schema.column(missing_cids[i]);
+                auto& missing_col = mutable_full_columns[missing_cids[i]];
+                if (tablet_column.has_default_value()) {
+                    
missing_col->insert_from(*mutable_default_value_columns[i], 0);
+                } else if (tablet_column.is_nullable()) {
+                    auto* nullable_column =
+                            
assert_cast<vectorized::ColumnNullable*>(missing_col.get());
+                    nullable_column->insert_null_elements(1);
+                } else if (tablet_schema.auto_increment_column() == 
tablet_column.name()) {
+                    const auto& column = 
rowset_ctx->tablet_schema->column(tablet_column.name());
+                    DCHECK(column.type() == FieldType::OLAP_FIELD_TYPE_BIGINT);
+                    auto* auto_inc_column =
+                            
assert_cast<vectorized::ColumnInt64*>(missing_col.get());
+                    auto_inc_column->insert(
+                            (assert_cast<const vectorized::ColumnInt64*>(
+                                     
block->get_by_name("__PARTIAL_UPDATE_AUTO_INC_COLUMN__")
+                                             .column.get()))
+                                    ->get_element(idx));
+                } else {
+                    // If the control flow reaches this branch, the column 
neither has default value
+                    // nor is nullable. It means that the row's delete sign is 
marked, and the value
+                    // columns are useless and won't be read. So we can just 
put arbitary values in the cells
+                    missing_col->insert_default();
+                }
+            }
+            continue;
+        }
+        for (auto i = 0; i < missing_cids.size(); ++i) {
+            mutable_full_columns[missing_cids[i]]->insert_from(
+                    *old_value_block.get_by_position(i).column, 
pos_in_old_block);
+        }
+    }
+    full_block.set_columns(std::move(mutable_full_columns));
+    return Status::OK();
+}
+
+void FlexibleReadPlan::prepare_to_read(const RowLocation& row_location, size_t 
pos,

Review Comment:
   warning: method 'prepare_to_read' can be made const 
[readability-make-member-function-const]
   
   be/src/olap/partial_update_info.cpp:396:
   ```diff
   -                                        const BitmapValue& skip_bitmap) {
   +                                        const BitmapValue& skip_bitmap) 
const {
   ```
   
   be/src/olap/partial_update_info.h:133:
   ```diff
   -                          const BitmapValue& skip_bitmap);
   +                          const BitmapValue& skip_bitmap) const;
   ```
   



##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -616,142 +587,345 @@
     _olap_data_convertor->clear_source_content();
     return Status::OK();
 }
+Status SegmentWriter::append_block_with_flexible_partial_content(const 
vectorized::Block* block,

Review Comment:
   warning: function 'append_block_with_flexible_partial_content' exceeds 
recommended size/complexity thresholds [readability-function-size]
   ```cpp
   Status SegmentWriter::append_block_with_flexible_partial_content(const 
vectorized::Block* block,
                         ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:589:** 829 lines 
including whitespace and comments (threshold 80)
   ```cpp
   Status SegmentWriter::append_block_with_flexible_partial_content(const 
vectorized::Block* block,
                         ^
   ```
   
   </details>
   



##########
be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:
##########
@@ -562,145 +540,403 @@
     return Status::OK();
 }
 
-Status VerticalSegmentWriter::_fill_missing_columns(
-        vectorized::MutableColumns& mutable_full_columns,
-        const std::vector<bool>& use_default_or_null_flag, bool 
has_default_or_nullable,
-        const size_t& segment_start_pos, const vectorized::Block* block) {
-    if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) {
-        // TODO(plat1ko): CloudStorageEngine
-        return Status::NotSupported("fill_missing_columns");
-    }
+Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(
+        RowsInBlock& data, vectorized::Block& full_block) {
+    DCHECK(_is_mow());
+    DCHECK(_opts.rowset_ctx->partial_update_info != nullptr);
+    
DCHECK(_opts.rowset_ctx->partial_update_info->is_flexible_partial_update());
+    DCHECK(data.row_pos == 0);
+
+    // data.block has the same schema with full_block
+    DCHECK(data.block->columns() == _tablet_schema->num_columns());
+
     auto tablet = static_cast<Tablet*>(_tablet.get());
-    // create old value columns
-    const auto& missing_cids = 
_opts.rowset_ctx->partial_update_info->missing_cids;
-    auto old_value_block = _tablet_schema->create_block_by_cids(missing_cids);
-    CHECK_EQ(missing_cids.size(), old_value_block.columns());
-    auto mutable_old_columns = old_value_block.mutate_columns();
-    bool has_row_column = _tablet_schema->store_row_column();
-    // record real pos, key is input line num, value is old_block line num
-    std::map<uint32_t, uint32_t> read_index;
-    size_t read_idx = 0;
-    for (auto rs_it : _rssid_to_rid) {
-        for (auto seg_it : rs_it.second) {
-            auto rowset = _rsid_to_rowset[rs_it.first];
-            CHECK(rowset);
-            std::vector<uint32_t> rids;
-            for (auto [rid, pos] : seg_it.second) {
-                rids.emplace_back(rid);
-                read_index[pos] = read_idx++;
-            }
-            if (has_row_column) {
-                auto st = tablet->fetch_value_through_row_column(
-                        rowset, *_tablet_schema, seg_it.first, rids, 
missing_cids, old_value_block);
-                if (!st.ok()) {
-                    LOG(WARNING) << "failed to fetch value through row column";
-                    return st;
-                }
-                continue;
+    // create full block and fill with sort key columns
+    full_block = _tablet_schema->create_block();
+
+    auto segment_start_pos = _column_writers.front()->get_next_rowid();
+
+    DCHECK(_tablet_schema->has_skip_bitmap_col());
+    auto skip_bitmap_col_idx = _tablet_schema->skip_bitmap_col_idx();
+
+    auto get_skip_bitmaps = [&skip_bitmap_col_idx](const vectorized::Block* 
block) {
+        return &(assert_cast<vectorized::ColumnBitmap*>(
+                         
block->get_by_position(skip_bitmap_col_idx).column->assume_mutable().get())
+                         ->get_data());
+    };
+    std::vector<BitmapValue>* skip_bitmaps = get_skip_bitmaps(data.block);
+
+    bool has_default_or_nullable = false;
+    std::vector<bool> use_default_or_null_flag;
+    use_default_or_null_flag.reserve(data.num_rows);
+
+    int32_t seq_map_col_unique_id = 
_opts.rowset_ctx->partial_update_info->sequence_map_col_uid();
+    bool schema_has_sequence_col = _tablet_schema->has_sequence_col();
+
+    
DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_flexible_partial_content.sleep",
+                    { sleep(60); })
+    const std::vector<RowsetSharedPtr>& specified_rowsets = 
_mow_context->rowset_ptrs;
+    std::vector<std::unique_ptr<SegmentCacheHandle>> 
segment_caches(specified_rowsets.size());
+
+    std::vector<vectorized::IOlapColumnDataAccessor*> key_columns {};
+    vectorized::IOlapColumnDataAccessor* seq_column {nullptr};
+
+    auto encode_key_columns =
+            [&full_block, &data,
+             this](std::vector<vectorized::IOlapColumnDataAccessor*>& 
key_columns) -> Status {
+        key_columns.clear();
+        for (std::size_t cid {0}; cid < _num_key_columns; cid++) {
+            full_block.replace_by_position(cid, 
data.block->get_by_position(cid).column);
+            
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
+                    full_block.get_by_position(cid), data.row_pos, 
data.num_rows, cid));
+            auto [status, column] = 
_olap_data_convertor->convert_column_data(cid);
+            if (!status.ok()) {
+                return status;
             }
-            for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
-                TabletColumn tablet_column = 
_tablet_schema->column(missing_cids[cid]);
-                auto st = tablet->fetch_value_by_rowids(rowset, seg_it.first, 
rids, tablet_column,
-                                                        
mutable_old_columns[cid]);
-                // set read value to output block
-                if (!st.ok()) {
-                    LOG(WARNING) << "failed to fetch value by rowids";
-                    return st;
-                }
+            key_columns.push_back(column);
+        }
+        return Status::OK();
+    };
+
+    auto encode_seq_column = [&data, &schema_has_sequence_col,
+                              this](vectorized::IOlapColumnDataAccessor*& 
seq_column) -> Status {
+        seq_column = nullptr;
+        if (schema_has_sequence_col) {
+            auto seq_col_idx = _tablet_schema->sequence_col_idx();
+            
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
+                    data.block->get_by_position(seq_col_idx), data.row_pos, 
data.num_rows,
+                    seq_col_idx));
+            auto [status, column] = 
_olap_data_convertor->convert_column_data(seq_col_idx);
+            if (!status.ok()) {
+                return status;
             }
+            seq_column = column;
+        }
+        return Status::OK();
+    };
+
+    // 1. encode key columns
+    // we can only encode sort key columns currently becasue all non-key 
columns in flexible partial update
+    // can have missing cells
+    RETURN_IF_ERROR(encode_key_columns(key_columns));
+
+    // 2. encode sequence column
+    // We encode the seguence column even thought it may have invalid values 
in some rows because we need to
+    // encode the value of sequence column in key for rows that have a valid 
value in sequence column during
+    // lookup_raw_key. We will encode the sequence column again at the end of 
this method. At that time, we have
+    // a valid sequence column to encode the key with seq col.
+    RETURN_IF_ERROR(encode_seq_column(seq_column));
+
+    // 3. merge duplicate rows when table has sequence column
+    // When there are multiple rows with the same keys in memtable, some of 
them specify specify the sequence column,
+    // some of them don't. We can't do the de-duplication in memtable. We must 
de-duplicate them here.
+    if (schema_has_sequence_col) {
+        std::size_t origin_rows = data.num_rows;
+        RETURN_IF_ERROR(_merge_rows_for_sequence_column(data, skip_bitmaps, 
key_columns, seq_column,
+                                                        specified_rowsets, 
segment_caches));
+        if (origin_rows != data.num_rows) {
+            // data in block has changed, should re-encode key columns, 
sequence column and re-get skip_bitmaps
+            _olap_data_convertor->clear_source_content();
+            RETURN_IF_ERROR(encode_key_columns(key_columns));
+            RETURN_IF_ERROR(encode_seq_column(seq_column));
+            skip_bitmaps = get_skip_bitmaps(data.block);
         }
     }
-    // build default value columns
-    auto default_value_block = old_value_block.clone_empty();
-    auto mutable_default_value_columns = default_value_block.mutate_columns();
-
-    const vectorized::Int8* delete_sign_column_data = nullptr;
-    if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
-                old_value_block.try_get_by_name(DELETE_SIGN);
-        delete_sign_column != nullptr) {
-        auto& delete_sign_col =
-                reinterpret_cast<const 
vectorized::ColumnInt8&>(*(delete_sign_column->column));
-        delete_sign_column_data = delete_sign_col.get_data().data();
-    }
-
-    if (has_default_or_nullable || delete_sign_column_data != nullptr) {
-        for (auto i = 0; i < missing_cids.size(); ++i) {
-            const auto& column = _tablet_schema->column(missing_cids[i]);
-            if (column.has_default_value()) {
-                const auto& default_value =
-                        
_opts.rowset_ctx->partial_update_info->default_values[i];
-                vectorized::ReadBuffer 
rb(const_cast<char*>(default_value.c_str()),
-                                          default_value.size());
-                
RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string(
-                        rb, mutable_default_value_columns[i].get()));
+
+    const auto* delete_sign_column_data =
+            Tablet::get_delete_sign_column_data(*data.block, data.row_pos + 
data.num_rows);
+    DCHECK(delete_sign_column_data != nullptr);
+
+    // 4. write key columns data
+    for (std::size_t cid {0}; cid < _num_key_columns; cid++) {
+        const auto& column = key_columns[cid];
+        DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written);
+        RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), 
column->get_data(),
+                                                     data.num_rows));
+        DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written + 
data.num_rows);
+    }
+
+    // 5. genreate read plan
+    FlexibleReadPlan read_plan {_tablet_schema->store_row_column()};
+    PartialUpdateStats stats;
+    RETURN_IF_ERROR(_generate_flexible_read_plan(
+            read_plan, data, segment_start_pos, schema_has_sequence_col, 
seq_map_col_unique_id,
+            skip_bitmaps, key_columns, seq_column, delete_sign_column_data, 
specified_rowsets,
+            segment_caches, has_default_or_nullable, use_default_or_null_flag, 
stats));
+    CHECK_EQ(use_default_or_null_flag.size(), data.num_rows);
+
+    if (config::enable_merge_on_write_correctness_check) {
+        
tablet->add_sentinel_mark_to_delete_bitmap(_mow_context->delete_bitmap.get(),
+                                                   _mow_context->rowset_ids);
+    }
+
+    // 6. read according plan to fill full_block
+    RETURN_IF_ERROR(read_plan.fill_non_primary_key_columns(
+            _opts.rowset_ctx, _rsid_to_rowset, *_tablet_schema, full_block,
+            use_default_or_null_flag, has_default_or_nullable, 
segment_start_pos, data.row_pos,
+            data.block, skip_bitmaps));
+
+    // TODO(bobhan1): should we replace the skip bitmap column with empty 
bitmaps to reduce storage occupation?
+    // this column is not needed in read path for merge-on-write table
+
+    // 7. fill row store column
+    _serialize_block_to_row_column(full_block);
+
+    // 8. encode and write all non-primary key columns(including sequence 
column if exists)
+    for (auto cid = _num_key_columns; cid < _tablet_schema->num_columns(); 
cid++) {
+        
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
+                full_block.get_by_position(cid), data.row_pos, data.num_rows, 
cid));
+        auto [status, column] = _olap_data_convertor->convert_column_data(cid);
+        if (!status.ok()) {
+            return status;
+        }
+        if (cid == _tablet_schema->sequence_col_idx()) {
+            // should use the latest encoded sequence column to build the 
primary index
+            seq_column = column;
+        }
+        DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written);
+        RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), 
column->get_data(),
+                                                     data.num_rows));
+        DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written + 
data.num_rows);
+    }
+
+    _num_rows_updated += stats.num_rows_updated;
+    _num_rows_deleted += stats.num_rows_deleted;
+    _num_rows_new_added += stats.num_rows_new_added;
+    _num_rows_filtered += stats.num_rows_filtered;
+
+    if (_num_rows_written != data.row_pos ||
+        _primary_key_index_builder->num_rows() != _num_rows_written) {
+        return Status::InternalError(
+                "Correctness check failed, _num_rows_written: {}, row_pos: {}, 
primary key "
+                "index builder num rows: {}",
+                _num_rows_written, data.row_pos, 
_primary_key_index_builder->num_rows());
+    }
+
+    // 9. build primary key index
+    for (size_t block_pos = data.row_pos; block_pos < data.row_pos + 
data.num_rows; block_pos++) {
+        std::string key = _full_encode_keys(key_columns, block_pos - 
data.row_pos);
+        if (_tablet_schema->has_sequence_col()) {
+            _encode_seq_column(seq_column, block_pos - data.row_pos, &key);
+        }
+        RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
+    }
+
+    _num_rows_written += data.num_rows;
+    DCHECK_EQ(_primary_key_index_builder->num_rows(), _num_rows_written)
+            << "primary key index builder num rows(" << 
_primary_key_index_builder->num_rows()
+            << ") not equal to segment writer's num rows written(" << 
_num_rows_written << ")";
+    _olap_data_convertor->clear_source_content();
+    return Status::OK();
+}
+
+Status VerticalSegmentWriter::_generate_encoded_default_seq_value(const 
TabletSchema& tablet_schema,
+                                                                  const 
PartialUpdateInfo& info,
+                                                                  std::string* 
encoded_value) {
+    const auto& seq_column = 
tablet_schema.column(tablet_schema.sequence_col_idx());
+    auto block = tablet_schema.create_block_by_cids(
+            {static_cast<uint32_t>(tablet_schema.sequence_col_idx())});
+    if (seq_column.has_default_value()) {
+        auto idx = tablet_schema.sequence_col_idx() - 
tablet_schema.num_key_columns();
+        const auto& default_value = info.default_values[idx];
+        vectorized::ReadBuffer rb(const_cast<char*>(default_value.c_str()), 
default_value.size());
+        RETURN_IF_ERROR(block.get_by_position(0).type->from_string(
+                rb, block.get_by_position(0).column->assume_mutable().get()));
+
+    } else {
+        block.get_by_position(0).column->assume_mutable()->insert_default();
+    }
+    DCHECK_EQ(block.rows(), 1);
+    auto olap_data_convertor = 
std::make_unique<vectorized::OlapBlockDataConvertor>();
+    olap_data_convertor->add_column_data_convertor(seq_column);
+    olap_data_convertor->set_source_content(&block, 0, 1);
+    auto [status, column] = olap_data_convertor->convert_column_data(0);
+    if (!status.ok()) {
+        return status;
+    }
+    // include marker
+    _encode_seq_column(column, 0, encoded_value);
+    return Status::OK();
+}
+
+Status VerticalSegmentWriter::_generate_flexible_read_plan(
+        FlexibleReadPlan& read_plan, RowsInBlock& data, size_t 
segment_start_pos,
+        bool schema_has_sequence_col, int32_t seq_map_col_unique_id,
+        std::vector<BitmapValue>* skip_bitmaps,
+        const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
+        vectorized::IOlapColumnDataAccessor* seq_column, const signed char* 
delete_sign_column_data,
+        const std::vector<RowsetSharedPtr>& specified_rowsets,
+        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
+        bool& has_default_or_nullable, std::vector<bool>& 
use_default_or_null_flag,
+        PartialUpdateStats& stats) {
+    int32_t delete_sign_col_unique_id =
+            
_tablet_schema->column(_tablet_schema->delete_sign_idx()).unique_id();
+    int32_t seq_col_unique_id =
+            (_tablet_schema->has_sequence_col()
+                     ? 
_tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id()
+                     : -1);
+    for (size_t block_pos = data.row_pos; block_pos < data.row_pos + 
data.num_rows; block_pos++) {
+        size_t delta_pos = block_pos - data.row_pos;
+        size_t segment_pos = segment_start_pos + delta_pos;
+        auto& skip_bitmap = skip_bitmaps->at(block_pos);
+
+        // the hidden sequence column should have the same mark with sequence 
map column
+        if (seq_map_col_unique_id != -1) {
+            DCHECK(schema_has_sequence_col);
+            if (skip_bitmap.contains(seq_map_col_unique_id)) {
+                skip_bitmap.add(seq_col_unique_id);
             }
         }
+
+        std::string key = _full_encode_keys(key_columns, delta_pos);
+        _maybe_invalid_row_cache(key);
+        bool row_has_sequence_col =
+                (schema_has_sequence_col && 
!skip_bitmap.contains(seq_col_unique_id));
+        if (row_has_sequence_col) {
+            _encode_seq_column(seq_column, delta_pos, &key);
+        }
+
+        // mark key with delete sign as deleted.
+        bool have_delete_sign = 
(!skip_bitmap.contains(delete_sign_col_unique_id) &&
+                                 delete_sign_column_data[block_pos] != 0);
+
+        auto not_found_cb = [&]() {
+            return 
_opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error(
+                    *_tablet_schema, &skip_bitmap);
+        };
+        auto update_read_plan = [&](const RowLocation& loc) {
+            read_plan.prepare_to_read(loc, segment_pos, skip_bitmap);
+        };
+
+        RETURN_IF_ERROR(_probe_key_for_mow(std::move(key), segment_pos, 
row_has_sequence_col,
+                                           have_delete_sign, 
specified_rowsets, segment_caches,
+                                           has_default_or_nullable, 
use_default_or_null_flag,
+                                           update_read_plan, not_found_cb, 
stats));
     }
+    return Status::OK();
+}
 
-    // fill all missing value from mutable_old_columns, need to consider 
default value and null value
-    for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
-        // `use_default_or_null_flag[idx] == false` doesn't mean that we 
should read values from the old row
-        // for the missing columns. For example, if a table has sequence 
column, the rows with DELETE_SIGN column
-        // marked will not be marked in delete bitmap(see 
https://github.com/apache/doris/pull/24011), so it will
-        // be found in Tablet::lookup_row_key() and 
`use_default_or_null_flag[idx]` will be false. But we should not
-        // read values from old rows for missing values in this occasion. So 
we should read the DELETE_SIGN column
-        // to check if a row REALLY exists in the table.
-        if (use_default_or_null_flag[idx] ||
-            (delete_sign_column_data != nullptr &&
-             delete_sign_column_data[read_index[idx + segment_start_pos]] != 
0)) {
-            for (auto i = 0; i < missing_cids.size(); ++i) {
-                // if the column has default value, fill it with default value
-                // otherwise, if the column is nullable, fill it with null 
value
-                const auto& tablet_column = 
_tablet_schema->column(missing_cids[i]);
-                if (tablet_column.has_default_value()) {
-                    mutable_full_columns[missing_cids[i]]->insert_from(
-                            *mutable_default_value_columns[i].get(), 0);
-                } else if (tablet_column.is_nullable()) {
-                    auto nullable_column = 
assert_cast<vectorized::ColumnNullable*>(
-                            mutable_full_columns[missing_cids[i]].get());
-                    nullable_column->insert_null_elements(1);
-                } else if (_tablet_schema->auto_increment_column() == 
tablet_column.name()) {
-                    
DCHECK(_opts.rowset_ctx->tablet_schema->column(tablet_column.name()).type() ==
-                           FieldType::OLAP_FIELD_TYPE_BIGINT);
-                    auto auto_inc_column = 
assert_cast<vectorized::ColumnInt64*>(
-                            mutable_full_columns[missing_cids[i]].get());
-                    auto_inc_column->insert(
-                            (assert_cast<const vectorized::ColumnInt64*>(
-                                     
block->get_by_name(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL)
-                                             .column.get()))
-                                    ->get_element(idx));
-                } else {
-                    // If the control flow reaches this branch, the column 
neither has default value
-                    // nor is nullable. It means that the row's delete sign is 
marked, and the value
-                    // columns are useless and won't be read. So we can just 
put arbitary values in the cells
-                    mutable_full_columns[missing_cids[i]]->insert_default();
-                }
+Status VerticalSegmentWriter::_merge_rows_for_sequence_column(

Review Comment:
   warning: function '_merge_rows_for_sequence_column' exceeds recommended 
size/complexity thresholds [readability-function-size]
   ```cpp
   Status VerticalSegmentWriter::_merge_rows_for_sequence_column(
                                 ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:818:** 92 lines 
including whitespace and comments (threshold 80)
   ```cpp
   Status VerticalSegmentWriter::_merge_rows_for_sequence_column(
                                 ^
   ```
   
   </details>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to