github-actions[bot] commented on code in PR #41839:
URL: https://github.com/apache/doris/pull/41839#discussion_r1800406205


##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -616,142 +587,345 @@
     _olap_data_convertor->clear_source_content();
     return Status::OK();
 }
+Status SegmentWriter::append_block_with_flexible_partial_content(const 
vectorized::Block* block,

Review Comment:
   warning: function 'append_block_with_flexible_partial_content' exceeds 
recommended size/complexity thresholds [readability-function-size]
   ```cpp
   Status SegmentWriter::append_block_with_flexible_partial_content(const 
vectorized::Block* block,
                         ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:589:** 181 lines 
including whitespace and comments (threshold 80)
   ```cpp
   Status SegmentWriter::append_block_with_flexible_partial_content(const 
vectorized::Block* block,
                         ^
   ```
   
   </details>
   



##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -616,142 +587,345 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     _olap_data_convertor->clear_source_content();
     return Status::OK();
 }
+Status SegmentWriter::append_block_with_flexible_partial_content(const 
vectorized::Block* block,

Review Comment:
   warning: function 'append_block_with_flexible_partial_content' has cognitive 
complexity of 72 (threshold 50) [readability-function-cognitive-complexity]
   ```cpp
   Status SegmentWriter::append_block_with_flexible_partial_content(const 
vectorized::Block* block,
                         ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:606:** nesting level 
increased to 1
   ```cpp
       auto get_skip_bitmaps = [&skip_bitmap_col_idx](const vectorized::Block* 
block) {
                               ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:619:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       
DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_flexible_partial_content.sleep",
       ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:619:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
       
DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_flexible_partial_content.sleep",
       ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:628:** nesting level 
increased to 1
   ```cpp
               [&full_block, &block, &row_pos, &num_rows,
               ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:631:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           for (std::size_t cid {0}; cid < _num_key_columns; cid++) {
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:633:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
               ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:633:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
               ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:636:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (!status.ok()) {
               ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:644:** nesting level 
increased to 1
   ```cpp
       auto encode_seq_column = [&block, &row_pos, &num_rows,
                                ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:647:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (_tablet_schema->has_sequence_col()) {
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:649:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
               ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:649:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
               ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:652:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (!status.ok()) {
               ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:663:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       RETURN_IF_ERROR(encode_key_columns(key_columns));
       ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:663:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
       RETURN_IF_ERROR(encode_key_columns(key_columns));
       ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:670:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       RETURN_IF_ERROR(encode_seq_column(seq_column));
       ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:670:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
       RETURN_IF_ERROR(encode_seq_column(seq_column));
       ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:675:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (_tablet_schema->has_sequence_col()) {
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:677:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(merge_rows_for_sequence_column(block, row_pos, 
num_rows, skip_bitmaps,
           ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:677:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(merge_rows_for_sequence_column(block, row_pos, 
num_rows, skip_bitmaps,
           ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:680:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (origin_rows != num_rows) {
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:683:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(encode_key_columns(key_columns));
               ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:683:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(encode_key_columns(key_columns));
               ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:684:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(encode_seq_column(seq_column));
               ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:684:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(encode_seq_column(seq_column));
               ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:694:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       for (std::size_t cid {0}; cid < _num_key_columns; cid++) {
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:697:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(
           ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:697:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(
           ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:705:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       RETURN_IF_ERROR(generate_flexible_read_plan(
       ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:705:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
       RETURN_IF_ERROR(generate_flexible_read_plan(
       ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:712:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (config::enable_merge_on_write_correctness_check) {
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:719:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       RETURN_IF_ERROR(read_plan.fill_non_primary_key_columns(
       ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:719:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
       RETURN_IF_ERROR(read_plan.fill_non_primary_key_columns(
       ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:753:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (_num_rows_written != row_pos ||
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:762:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       RETURN_IF_ERROR(
       ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:762:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
       RETURN_IF_ERROR(
       ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   
   </details>
   



##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -616,142 +587,345 @@
     _olap_data_convertor->clear_source_content();
     return Status::OK();
 }
+Status SegmentWriter::append_block_with_flexible_partial_content(const 
vectorized::Block* block,
+                                                                 size_t 
row_pos, size_t num_rows) {
+    DCHECK(_is_mow());
+    DCHECK(_opts.rowset_ctx->partial_update_info != nullptr);
+    
DCHECK(_opts.rowset_ctx->partial_update_info->is_flexible_partial_update());
+    DCHECK_EQ(row_pos, 0);
 
-Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& 
mutable_full_columns,
-                                           const std::vector<bool>& 
use_default_or_null_flag,
-                                           bool has_default_or_nullable,
-                                           const size_t& segment_start_pos,
-                                           const vectorized::Block* block) {
-    if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) {
-        // TODO(plat1ko): cloud mode
-        return Status::NotSupported("fill_missing_columns");
-    }
-    auto tablet = static_cast<Tablet*>(_tablet.get());
-    // create old value columns
-    const auto& cids_missing = 
_opts.rowset_ctx->partial_update_info->missing_cids;
-    auto old_value_block = _tablet_schema->create_block_by_cids(cids_missing);
-    CHECK_EQ(cids_missing.size(), old_value_block.columns());
-    bool has_row_column = _tablet_schema->store_row_column();
-    // record real pos, key is input line num, value is old_block line num
-    std::map<uint32_t, uint32_t> read_index;
-    size_t read_idx = 0;
-    for (auto rs_it : _rssid_to_rid) {
-        for (auto seg_it : rs_it.second) {
-            auto rowset = _rsid_to_rowset[rs_it.first];
-            CHECK(rowset);
-            std::vector<uint32_t> rids;
-            for (auto [rid, pos] : seg_it.second) {
-                rids.emplace_back(rid);
-                read_index[pos] = read_idx++;
-            }
-            if (has_row_column) {
-                auto st = tablet->fetch_value_through_row_column(
-                        rowset, *_tablet_schema, seg_it.first, rids, 
cids_missing, old_value_block);
-                if (!st.ok()) {
-                    LOG(WARNING) << "failed to fetch value through row column";
-                    return st;
-                }
-                continue;
+    // block has the same schema with full_block
+    DCHECK(block->columns() == _tablet_schema->num_columns());
+
+    // create full block and fill with sort key columns
+    auto full_block = _tablet_schema->create_block();
+
+    auto segment_start_pos = _column_writers.front()->get_next_rowid();
+
+    DCHECK(_tablet_schema->has_skip_bitmap_col());
+    auto skip_bitmap_col_idx = _tablet_schema->skip_bitmap_col_idx();
+    auto get_skip_bitmaps = [&skip_bitmap_col_idx](const vectorized::Block* 
block) {
+        return &(assert_cast<vectorized::ColumnBitmap*>(
+                         
block->get_by_position(skip_bitmap_col_idx).column->assume_mutable().get())
+                         ->get_data());
+    };
+    std::vector<BitmapValue>* skip_bitmaps = get_skip_bitmaps(block);
+
+    bool has_default_or_nullable = false;
+    std::vector<bool> use_default_or_null_flag;
+    use_default_or_null_flag.reserve(num_rows);
+
+    int32_t seq_map_col_unique_id = 
_opts.rowset_ctx->partial_update_info->sequence_map_col_uid();
+
+    
DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_flexible_partial_content.sleep",
+                    { sleep(60); })
+    const std::vector<RowsetSharedPtr>& specified_rowsets = 
_mow_context->rowset_ptrs;
+    std::vector<std::unique_ptr<SegmentCacheHandle>> 
segment_caches(specified_rowsets.size());
+
+    std::vector<vectorized::IOlapColumnDataAccessor*> key_columns {};
+    vectorized::IOlapColumnDataAccessor* seq_column {nullptr};
+
+    auto encode_key_columns =
+            [&full_block, &block, &row_pos, &num_rows,
+             this](std::vector<vectorized::IOlapColumnDataAccessor*>& 
key_columns) -> Status {
+        key_columns.clear();
+        for (std::size_t cid {0}; cid < _num_key_columns; cid++) {
+            full_block.replace_by_position(cid, 
block->get_by_position(cid).column);
+            
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
+                    full_block.get_by_position(cid), row_pos, num_rows, cid));
+            auto [status, column] = 
_olap_data_convertor->convert_column_data(cid);
+            if (!status.ok()) {
+                return status;
             }
-            auto mutable_old_columns = old_value_block.mutate_columns();
-            for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
-                TabletColumn tablet_column = 
_tablet_schema->column(cids_missing[cid]);
-                auto st = tablet->fetch_value_by_rowids(rowset, seg_it.first, 
rids, tablet_column,
-                                                        
mutable_old_columns[cid]);
-                // set read value to output block
-                if (!st.ok()) {
-                    LOG(WARNING) << "failed to fetch value by rowids";
-                    return st;
-                }
+            key_columns.push_back(column);
+        }
+        return Status::OK();
+    };
+
+    auto encode_seq_column = [&block, &row_pos, &num_rows,
+                              this](vectorized::IOlapColumnDataAccessor*& 
seq_column) -> Status {
+        seq_column = nullptr;
+        if (_tablet_schema->has_sequence_col()) {
+            auto seq_col_idx = _tablet_schema->sequence_col_idx();
+            
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
+                    block->get_by_position(seq_col_idx), row_pos, num_rows, 
seq_col_idx));
+            auto [status, column] = 
_olap_data_convertor->convert_column_data(seq_col_idx);
+            if (!status.ok()) {
+                return status;
             }
-            old_value_block.set_columns(std::move(mutable_old_columns));
+            seq_column = column;
+        }
+        return Status::OK();
+    };
+
+    // 1. encode key columns
+    // we can only encode sort key columns currently becasue all non-key 
columns in flexible partial update
+    // can have missing cells
+    RETURN_IF_ERROR(encode_key_columns(key_columns));
+
+    // 2. encode sequence column
+    // We encode the seguence column even thought it may have invalid values 
in some rows because we need to
+    // encode the value of sequence column in key for rows that have a valid 
value in sequence column during
+    // lookup_raw_key. We will encode the sequence column again at the end of 
this method. At that time, we have
+    // a valid sequence column to encode the key with seq col.
+    RETURN_IF_ERROR(encode_seq_column(seq_column));
+
+    // 3. merge duplicate rows when table has sequence column
+    // When there are multiple rows with the same keys in memtable, some of 
them specify specify the sequence column,
+    // some of them don't. We can't do the de-duplication in memtable. We must 
de-duplicate them here.
+    if (_tablet_schema->has_sequence_col()) {
+        std::size_t origin_rows = num_rows;
+        RETURN_IF_ERROR(merge_rows_for_sequence_column(block, row_pos, 
num_rows, skip_bitmaps,
+                                                       key_columns, 
seq_column, specified_rowsets,
+                                                       segment_caches));
+        if (origin_rows != num_rows) {
+            // data in block has changed, should re-encode key columns, 
sequence column and re-get skip_bitmaps
+            _olap_data_convertor->clear_source_content();
+            RETURN_IF_ERROR(encode_key_columns(key_columns));
+            RETURN_IF_ERROR(encode_seq_column(seq_column));
+            skip_bitmaps = get_skip_bitmaps(block);
+        }
+    }
+
+    const auto* delete_sign_column_data =
+            Tablet::get_delete_sign_column_data(*block, row_pos + num_rows);
+    DCHECK(delete_sign_column_data != nullptr);
+
+    // 4. write key columns data
+    for (std::size_t cid {0}; cid < _num_key_columns; cid++) {
+        const auto& column = key_columns[cid];
+        DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written);
+        RETURN_IF_ERROR(
+                _column_writers[cid]->append(column->get_nullmap(), 
column->get_data(), num_rows));
+        DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written + 
num_rows);
+    }
+
+    // 5. genreate read plan
+    FlexibleReadPlan read_plan {_tablet_schema->store_row_column()};
+    PartialUpdateStats stats;
+    RETURN_IF_ERROR(generate_flexible_read_plan(
+            read_plan, row_pos, num_rows, segment_start_pos, 
_tablet_schema->has_sequence_col(),
+            seq_map_col_unique_id, skip_bitmaps, key_columns, seq_column, 
delete_sign_column_data,
+            specified_rowsets, segment_caches, has_default_or_nullable, 
use_default_or_null_flag,
+            stats));
+    CHECK_EQ(use_default_or_null_flag.size(), num_rows);
+
+    if (config::enable_merge_on_write_correctness_check) {
+        static_cast<Tablet*>(_tablet.get())
+                
->add_sentinel_mark_to_delete_bitmap(_mow_context->delete_bitmap.get(),
+                                                     _mow_context->rowset_ids);
+    }
+
+    // 6. read according plan to fill full_block
+    RETURN_IF_ERROR(read_plan.fill_non_primary_key_columns(
+            _opts.rowset_ctx, _rsid_to_rowset, *_tablet_schema, full_block,
+            use_default_or_null_flag, has_default_or_nullable, 
segment_start_pos, row_pos, block,
+            skip_bitmaps));
+
+    // TODO(bobhan1): should we replace the skip bitmap column with empty 
bitmaps to reduce storage occupation?
+    // this column is not needed in read path for merge-on-write table
+
+    // 7. fill row store column
+    _serialize_block_to_row_column(full_block);
+
+    // 8. encode and write all non-primary key columns(including sequence 
column if exists)
+    for (auto cid = _num_key_columns; cid < _tablet_schema->num_columns(); 
cid++) {
+        
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
+                full_block.get_by_position(cid), row_pos, num_rows, cid));
+        auto [status, column] = _olap_data_convertor->convert_column_data(cid);
+        if (!status.ok()) {
+            return status;
         }
+        if (cid == _tablet_schema->sequence_col_idx()) {
+            // should use the latest encoded sequence column to build the 
primary index
+            seq_column = column;
+        }
+        DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written);
+        RETURN_IF_ERROR(
+                _column_writers[cid]->append(column->get_nullmap(), 
column->get_data(), num_rows));
+        DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written + 
num_rows);
+    }
+
+    _num_rows_updated += stats.num_rows_updated;
+    _num_rows_deleted += stats.num_rows_deleted;
+    _num_rows_new_added += stats.num_rows_new_added;
+    _num_rows_filtered += stats.num_rows_filtered;
+
+    if (_num_rows_written != row_pos ||
+        _primary_key_index_builder->num_rows() != _num_rows_written) {
+        return Status::InternalError(
+                "Correctness check failed, _num_rows_written: {}, row_pos: {}, 
primary key "
+                "index builder num rows: {}",
+                _num_rows_written, row_pos, 
_primary_key_index_builder->num_rows());
     }
-    // build default value columns
-    auto default_value_block = old_value_block.clone_empty();
-    auto mutable_default_value_columns = default_value_block.mutate_columns();
-
-    const vectorized::Int8* delete_sign_column_data = nullptr;
-    if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
-                old_value_block.try_get_by_name(DELETE_SIGN);
-        delete_sign_column != nullptr) {
-        auto& delete_sign_col =
-                reinterpret_cast<const 
vectorized::ColumnInt8&>(*(delete_sign_column->column));
-        delete_sign_column_data = delete_sign_col.get_data().data();
-    }
-
-    if (has_default_or_nullable || delete_sign_column_data != nullptr) {
-        for (auto i = 0; i < cids_missing.size(); ++i) {
-            const auto& column = _tablet_schema->column(cids_missing[i]);
-            if (column.has_default_value()) {
-                const auto& default_value =
-                        
_opts.rowset_ctx->partial_update_info->default_values[i];
-                vectorized::ReadBuffer 
rb(const_cast<char*>(default_value.c_str()),
-                                          default_value.size());
-                
RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string(
-                        rb, mutable_default_value_columns[i].get()));
+
+    // 9. build primary key index
+    RETURN_IF_ERROR(
+            _generate_primary_key_index(_key_coders, key_columns, seq_column, 
num_rows, false));
+
+    _num_rows_written += num_rows;
+    DCHECK_EQ(_primary_key_index_builder->num_rows(), _num_rows_written)
+            << "primary key index builder num rows(" << 
_primary_key_index_builder->num_rows()
+            << ") not equal to segment writer's num rows written(" << 
_num_rows_written << ")";
+    _olap_data_convertor->clear_source_content();
+    return Status::OK();
+}
+
+Status SegmentWriter::generate_flexible_read_plan(
+        FlexibleReadPlan& read_plan, size_t row_pos, size_t num_rows, size_t 
segment_start_pos,
+        bool schema_has_sequence_col, int32_t seq_map_col_unique_id,
+        std::vector<BitmapValue>* skip_bitmaps,
+        const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
+        vectorized::IOlapColumnDataAccessor* seq_column, const signed char* 
delete_sign_column_data,
+        const std::vector<RowsetSharedPtr>& specified_rowsets,
+        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
+        bool& has_default_or_nullable, std::vector<bool>& 
use_default_or_null_flag,
+        PartialUpdateStats& stats) {
+    int32_t delete_sign_col_unique_id =
+            
_tablet_schema->column(_tablet_schema->delete_sign_idx()).unique_id();
+    int32_t seq_col_unique_id =
+            (_tablet_schema->has_sequence_col()
+                     ? 
_tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id()
+                     : -1);
+    for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; 
block_pos++) {
+        size_t delta_pos = block_pos - row_pos;
+        size_t segment_pos = segment_start_pos + delta_pos;
+        auto& skip_bitmap = skip_bitmaps->at(block_pos);
+
+        // the hidden sequence column should have the same mark with sequence 
map column
+        if (seq_map_col_unique_id != -1) {
+            DCHECK(schema_has_sequence_col);
+            if (skip_bitmap.contains(seq_map_col_unique_id)) {
+                skip_bitmap.add(seq_col_unique_id);
             }
         }
+
+        std::string key = _full_encode_keys(key_columns, delta_pos);
+        _maybe_invalid_row_cache(key);
+        bool row_has_sequence_col =
+                (schema_has_sequence_col && 
!skip_bitmap.contains(seq_col_unique_id));
+        if (row_has_sequence_col) {
+            _encode_seq_column(seq_column, delta_pos, &key);
+        }
+
+        // mark key with delete sign as deleted.
+        bool have_delete_sign = 
(!skip_bitmap.contains(delete_sign_col_unique_id) &&
+                                 delete_sign_column_data[block_pos] != 0);
+
+        auto not_found_cb = [&]() {
+            return 
_opts.rowset_ctx->partial_update_info->handle_non_strict_mode_not_found_error(
+                    *_tablet_schema, &skip_bitmap);
+        };
+        auto update_read_plan = [&](const RowLocation& loc) {
+            read_plan.prepare_to_read(loc, segment_pos, skip_bitmap);
+        };
+
+        RETURN_IF_ERROR(probe_key_for_mow(std::move(key), segment_pos, 
row_has_sequence_col,
+                                          have_delete_sign, specified_rowsets, 
segment_caches,
+                                          has_default_or_nullable, 
use_default_or_null_flag,
+                                          update_read_plan, not_found_cb, 
stats));
     }
+    return Status::OK();
+}
 
-    // fill all missing value from mutable_old_columns, need to consider 
default value and null value
-    for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
-        // `use_default_or_null_flag[idx] == false` doesn't mean that we 
should read values from the old row
-        // for the missing columns. For example, if a table has sequence 
column, the rows with DELETE_SIGN column
-        // marked will not be marked in delete bitmap(see 
https://github.com/apache/doris/pull/24011), so it will
-        // be found in Tablet::lookup_row_key() and 
`use_default_or_null_flag[idx]` will be false. But we should not
-        // read values from old rows for missing values in this occasion. So 
we should read the DELETE_SIGN column
-        // to check if a row REALLY exists in the table.
-        if (use_default_or_null_flag[idx] ||
-            (delete_sign_column_data != nullptr &&
-             delete_sign_column_data[read_index[idx + segment_start_pos]] != 
0)) {
-            for (auto i = 0; i < cids_missing.size(); ++i) {
-                // if the column has default value, fill it with default value
-                // otherwise, if the column is nullable, fill it with null 
value
-                const auto& tablet_column = 
_tablet_schema->column(cids_missing[i]);
-                if (tablet_column.has_default_value()) {
-                    mutable_full_columns[cids_missing[i]]->insert_from(
-                            *mutable_default_value_columns[i].get(), 0);
-                } else if (tablet_column.is_nullable()) {
-                    auto nullable_column = 
assert_cast<vectorized::ColumnNullable*>(
-                            mutable_full_columns[cids_missing[i]].get());
-                    nullable_column->insert_null_elements(1);
-                } else if (_tablet_schema->auto_increment_column() == 
tablet_column.name()) {
-                    
DCHECK(_opts.rowset_ctx->tablet_schema->column(tablet_column.name()).type() ==
-                           FieldType::OLAP_FIELD_TYPE_BIGINT);
-                    auto auto_inc_column = 
assert_cast<vectorized::ColumnInt64*>(
-                            mutable_full_columns[cids_missing[i]].get());
-                    auto_inc_column->insert(
-                            (assert_cast<const vectorized::ColumnInt64*>(
-                                     
block->get_by_name(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL)
-                                             .column.get()))
-                                    ->get_element(idx));
-                } else {
-                    // If the control flow reaches this branch, the column 
neither has default value
-                    // nor is nullable. It means that the row's delete sign is 
marked, and the value
-                    // columns are useless and won't be read. So we can just 
put arbitary values in the cells
-                    mutable_full_columns[cids_missing[i]]->insert_default();
-                }
+Status SegmentWriter::merge_rows_for_sequence_column(
+        const vectorized::Block* block, size_t row_pos, size_t& num_rows,
+        std::vector<BitmapValue>* skip_bitmaps,
+        const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
+        vectorized::IOlapColumnDataAccessor* seq_column,
+        const std::vector<RowsetSharedPtr>& specified_rowsets,
+        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
+    auto* tablet = static_cast<Tablet*>(_tablet.get());
+    auto seq_col_unique_id = 
_tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id();
+    std::string previous_key {};
+    bool previous_has_seq_col {false};
+    int duplicate_keys {0};
+
+    auto filter_column = vectorized::ColumnUInt8::create(num_rows, 1);
+    auto* __restrict filter_map = filter_column->get_data().data();
+
+    for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; 
block_pos++) {
+        size_t delta_pos = block_pos - row_pos;
+        auto& skip_bitmap = skip_bitmaps->at(block_pos);
+        std::string key = _full_encode_keys(key_columns, delta_pos);
+        bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
+        Status st;
+        if (delta_pos > 0 && previous_key == key) {
+            DCHECK(previous_has_seq_col == !row_has_sequence_col);
+            ++duplicate_keys;
+            RowLocation loc;
+            RowsetSharedPtr rowset;
+            size_t rid_missing_seq {};
+            size_t rid_with_seq {};
+            if (row_has_sequence_col) {
+                rid_missing_seq = block_pos - 1;
+                rid_with_seq = block_pos;
+            } else {
+                rid_missing_seq = block_pos;
+                rid_with_seq = block_pos - 1;
+            }
+            std::string previous_encoded_seq_value {};
+
+            st = tablet->lookup_row_key(key, _tablet_schema.get(), false, 
specified_rowsets, &loc,
+                                        _mow_context->max_version, 
segment_caches, &rowset, true,
+                                        &previous_encoded_seq_value);
+
+            Slice previous_seq_slice {};
+            if (st.is<KEY_NOT_FOUND>()) {
+                // TODO: handle default value
+            } else {
+                _rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
+            }
+            std::string cur_encoded_seq_value {};
+            _encode_seq_column(seq_column, rid_with_seq, 
&cur_encoded_seq_value);
+            int res = Slice {previous_encoded_seq_value}.compare(Slice 
{cur_encoded_seq_value});
+            VLOG_DEBUG << fmt::format(
+                    "SegmentWriter::merge_rows_for_sequence_column: 
rid_with_seq={}, "
+                    "rid_missing_seq={}, res={}",
+                    rid_with_seq, rid_missing_seq, res);
+            if (res > 0) {
+                filter_map[rid_with_seq] = 0;
+            } else if (res < 0) {
+                filter_map[rid_missing_seq] = 0;
+            } else {
+                filter_map[std::min(rid_with_seq, rid_missing_seq)] = 0;
             }
-            continue;
         }
-        auto pos_in_old_block = read_index[idx + segment_start_pos];
-        for (auto i = 0; i < cids_missing.size(); ++i) {
-            mutable_full_columns[cids_missing[i]]->insert_from(
-                    
*old_value_block.get_columns_with_type_and_name()[i].column.get(),
-                    pos_in_old_block);
+        previous_key = std::move(key);
+        previous_has_seq_col = row_has_sequence_col;
+    }
+    if (duplicate_keys > 0) {
+        auto num_cols = block->columns();
+        auto* mutable_block = const_cast<vectorized::Block*>(block);
+        mutable_block->insert({std::move(filter_column),
+                               std::make_shared<vectorized::DataTypeUInt8>(),
+                               "__dup_key_filter_col__"});
+        RETURN_IF_ERROR(vectorized::Block::filter_block(mutable_block, 
num_cols, num_cols));
+        int merged_rows = num_rows - mutable_block->rows();
+        if (duplicate_keys != merged_rows) {
+            auto msg = fmt::format(
+                    "duplicate_keys != merged_rows, duplicate_keys={}, 
merged_rows={}, "
+                    "num_rows={}, mutable_block->rows()={}",
+                    duplicate_keys, merged_rows, num_rows, block->rows());
+            DCHECK(false) << msg;
+            return Status::InternalError<false>(msg);
         }
+        num_rows = mutable_block->rows();
     }
     return Status::OK();
 }
 
 Status SegmentWriter::append_block(const vectorized::Block* block, size_t 
row_pos,

Review Comment:
   warning: function 'append_block' has cognitive complexity of 62 (threshold 
50) [readability-function-cognitive-complexity]
   ```cpp
   Status SegmentWriter::append_block(const vectorized::Block* block, size_t 
row_pos,
                         ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:919:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (_opts.rowset_ctx->partial_update_info &&
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:921:** +1
   ```cpp
           _opts.write_type == DataWriteType::TYPE_DIRECT &&
                                                          ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:923:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           if 
(_opts.rowset_ctx->partial_update_info->is_fixed_partial_update()) {
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:924:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(append_block_with_partial_content(block, 
row_pos, num_rows));
               ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:924:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(append_block_with_partial_content(block, 
row_pos, num_rows));
               ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:925:** +1, nesting level 
increased to 2
   ```cpp
           } else {
             ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:926:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               
RETURN_IF_ERROR(append_block_with_flexible_partial_content(block, row_pos, 
num_rows));
               ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:926:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               
RETURN_IF_ERROR(append_block_with_flexible_partial_content(block, row_pos, 
num_rows));
               ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:935:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (_tablet_schema->store_row_column() &&
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:945:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (_has_key) {
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:950:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (UNLIKELY(_short_key_row_pos == 0 && _num_rows_written == 0)) {
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:950:** +1
   ```cpp
           if (UNLIKELY(_short_key_row_pos == 0 && _num_rows_written == 0)) {
                                                ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:953:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           while (_short_key_row_pos + _opts.num_rows_per_block < 
_num_rows_written + num_rows) {
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:978:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (_has_key) {
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:986:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (need_primary_key_indexes && !need_short_key_indexes) { // mow 
table without cluster keys
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:986:** +1
   ```cpp
           if (need_primary_key_indexes && !need_short_key_indexes) { // mow 
table without cluster keys
                                        ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:987:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, 
key_columns, seq_column,
               ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:987:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, 
key_columns, seq_column,
               ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:989:** +1, nesting level 
increased to 2
   ```cpp
           } else if (!need_primary_key_indexes && need_short_key_indexes) { // 
other tables
                  ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:989:** +1
   ```cpp
           } else if (!need_primary_key_indexes && need_short_key_indexes) { // 
other tables
                                                ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:990:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, 
short_key_pos));
               ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:990:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, 
short_key_pos));
               ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:991:** +1, nesting level 
increased to 2
   ```cpp
           } else if (need_primary_key_indexes && need_short_key_indexes) { // 
mow with cluster keys
                  ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:991:** +1
   ```cpp
           } else if (need_primary_key_indexes && need_short_key_indexes) { // 
mow with cluster keys
                                               ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:993:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(_generate_primary_key_index(_primary_key_coders, 
key_columns,
               ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:993:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(_generate_primary_key_index(_primary_key_coders, 
key_columns,
               ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:1010:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, 
short_key_pos));
               ^
   ```
   **be/src/common/status.h:619:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/olap/rowset/segment_v2/segment_writer.cpp:1010:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, 
short_key_pos));
               ^
   ```
   **be/src/common/status.h:621:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   
   </details>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to