github-actions[bot] commented on code in PR #22504:
URL: https://github.com/apache/doris/pull/22504#discussion_r1333846546


##########
be/src/olap/rowset/beta_rowset_writer_v2.h:
##########
@@ -120,6 +120,11 @@ class BetaRowsetWriterV2 : public RowsetWriter {
         return Status::OK();
     }
 
+    // currently, a rowset cantains at most one segment, so we just return the 
segment's indicator maps
+    std::shared_ptr<IndicatorMaps> get_indicator_maps() const override {

Review Comment:
   warning: function 'get_indicator_maps' should be marked [[nodiscard]] 
[modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] std::shared_ptr<IndicatorMaps> get_indicator_maps() const 
override {
   ```
   



##########
be/src/exec/tablet_info.h:
##########
@@ -90,6 +90,7 @@ class OlapTableSchemaParam {
         return _partial_update_input_columns;
     }
     bool is_strict_mode() const { return _is_strict_mode; }
+    bool is_unique_key_replace_if_not_null() const { return 
_is_unique_key_replace_if_not_null; }

Review Comment:
   warning: function 'is_unique_key_replace_if_not_null' should be marked 
[[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] bool is_unique_key_replace_if_not_null() const { return 
_is_unique_key_replace_if_not_null; }
   ```
   



##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -539,93 +603,150 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     return Status::OK();
 }
 
-Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& 
mutable_full_columns,
-                                           const std::vector<bool>& 
use_default_or_null_flag,
-                                           bool has_default_or_nullable,
-                                           const size_t& segment_start_pos) {
-    // create old value columns
-    auto old_value_block = _tablet_schema->create_missing_columns_block();
-    std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids();
-    CHECK(cids_missing.size() == old_value_block.columns());
-    auto mutable_old_columns = old_value_block.mutate_columns();
-    bool has_row_column = _tablet_schema->store_row_column();
-    // record real pos, key is input line num, value is old_block line num
-    std::map<uint32_t, uint32_t> read_index;
-    size_t read_idx = 0;
-    for (auto rs_it : _rssid_to_rid) {
-        for (auto seg_it : rs_it.second) {
-            auto rowset = _rsid_to_rowset[rs_it.first];
-            CHECK(rowset);
-            std::vector<uint32_t> rids;
-            for (auto id_and_pos : seg_it.second) {
-                rids.emplace_back(id_and_pos.rid);
-                read_index[id_and_pos.pos] = read_idx++;
-            }
-            if (has_row_column) {
-                auto st = _tablet->fetch_value_through_row_column(rowset, 
seg_it.first, rids,
-                                                                  
cids_missing, old_value_block);
-                if (!st.ok()) {
-                    LOG(WARNING) << "failed to fetch value through row column";
-                    return st;
-                }
-                continue;
-            }
-            for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
-                TabletColumn tablet_column = 
_tablet_schema->column(cids_missing[cid]);
-                auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, 
rids, tablet_column,
-                                                         
mutable_old_columns[cid]);
-                // set read value to output block
-                if (!st.ok()) {
-                    LOG(WARNING) << "failed to fetch value by rowids";
-                    return st;
-                }
+void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows,
+                                         const IndicatorMapsVertical& 
indicator_maps_vertical) {
+    _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); // 
fixme(baohan): ?
+    for (auto [cid, indicator_map] : indicator_maps_vertical) {
+        for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) {
+            if (indicator_map != nullptr && indicator_map[pos] != 0) {
+                (*_indicator_maps)[pos].emplace_back(cid);
             }
         }
     }
+}
+
+// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3, 
v4, v5] where k1, k2 are key columns
+// and v1, v2, v3, v4, v5 are value columns. The table has the following data:
+//  k1|k2|v1|v2|v3|v4|v5
+//  1 |1 |1 |1 |1 |1 |1
+//  2 |2 |2 |2 |2 |2 |2
+//  3 |3 |3 |3 |3 |3 |3
+//  4 |4 |4 |4 |4 |4 |4
+//  5 |5 |5 |5 |5 |5 |5
+// The user inserts the following data for partial update. Charactor `?` means 
that the cell is filled
+// with indicator value(currently we use null as indicator value).
+// row_num   k1|k2|v1|v2|v3
+// 1         1 |1 |10|10|10
+// 2         2 |2 |? |20|20
+// 3         3 |3 |30|30|?
+// 4         4 |4 |40|40|40
+// 5         5 |5 |50|? |50
+// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5]
+//       old_full_read_columns = [v4, v5], the old values from the previous 
rows will be read into these columns.
+//       old_point_read_columns = [k1, k2, v1, v2, v3], the old values from 
the previous rows will be read into these columns
+// if the correspoding columns in the input block has cell with indicator 
value.
+// Becase the column is immutable, filled_including_value_columns will store 
the data merged from
+// the original input block and old_point_read_columns. After the insertion, 
the data in the table will be:
+//  k1|k2|v1|v2|v3|v4|v5
+//  1 |1 |10|10|10|1 |1
+//  2 |2 |2 |20|20|2 |2
+//  3 |3 |30|30|3 |3 |3
+//  4 |4 |40|40|40|4 |4
+//  5 |5 |50|5 |50|5 |5
+Status SegmentWriter::fill_missing_columns(
+        vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan,
+        const std::vector<uint32_t>& cids_full_read, const 
std::vector<uint32_t>& cids_point_read,
+        const std::vector<bool>& use_default_or_null_flag, bool 
has_default_or_nullable,
+        const size_t& segment_start_pos, bool 
is_unique_key_replace_if_not_null) {
+    vectorized::MutableColumns full_columns = full_block->mutate_columns();
+    vectorized::Block old_full_read_block = 
_tablet_schema->create_missing_columns_block();
+    vectorized::MutableColumns old_full_read_columns = 
old_full_read_block.mutate_columns();

Review Comment:
   warning: variable 'old_full_read_columns' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       vectorized::MutableColumns old_full_read_columns = 0 = 
old_full_read_block.mutate_columns();
   ```
   



##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -539,93 +603,150 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     return Status::OK();
 }
 
-Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& 
mutable_full_columns,
-                                           const std::vector<bool>& 
use_default_or_null_flag,
-                                           bool has_default_or_nullable,
-                                           const size_t& segment_start_pos) {
-    // create old value columns
-    auto old_value_block = _tablet_schema->create_missing_columns_block();
-    std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids();
-    CHECK(cids_missing.size() == old_value_block.columns());
-    auto mutable_old_columns = old_value_block.mutate_columns();
-    bool has_row_column = _tablet_schema->store_row_column();
-    // record real pos, key is input line num, value is old_block line num
-    std::map<uint32_t, uint32_t> read_index;
-    size_t read_idx = 0;
-    for (auto rs_it : _rssid_to_rid) {
-        for (auto seg_it : rs_it.second) {
-            auto rowset = _rsid_to_rowset[rs_it.first];
-            CHECK(rowset);
-            std::vector<uint32_t> rids;
-            for (auto id_and_pos : seg_it.second) {
-                rids.emplace_back(id_and_pos.rid);
-                read_index[id_and_pos.pos] = read_idx++;
-            }
-            if (has_row_column) {
-                auto st = _tablet->fetch_value_through_row_column(rowset, 
seg_it.first, rids,
-                                                                  
cids_missing, old_value_block);
-                if (!st.ok()) {
-                    LOG(WARNING) << "failed to fetch value through row column";
-                    return st;
-                }
-                continue;
-            }
-            for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
-                TabletColumn tablet_column = 
_tablet_schema->column(cids_missing[cid]);
-                auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, 
rids, tablet_column,
-                                                         
mutable_old_columns[cid]);
-                // set read value to output block
-                if (!st.ok()) {
-                    LOG(WARNING) << "failed to fetch value by rowids";
-                    return st;
-                }
+void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows,
+                                         const IndicatorMapsVertical& 
indicator_maps_vertical) {
+    _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); // 
fixme(baohan): ?
+    for (auto [cid, indicator_map] : indicator_maps_vertical) {
+        for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) {
+            if (indicator_map != nullptr && indicator_map[pos] != 0) {
+                (*_indicator_maps)[pos].emplace_back(cid);
             }
         }
     }
+}
+
+// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3, 
v4, v5] where k1, k2 are key columns
+// and v1, v2, v3, v4, v5 are value columns. The table has the following data:
+//  k1|k2|v1|v2|v3|v4|v5
+//  1 |1 |1 |1 |1 |1 |1
+//  2 |2 |2 |2 |2 |2 |2
+//  3 |3 |3 |3 |3 |3 |3
+//  4 |4 |4 |4 |4 |4 |4
+//  5 |5 |5 |5 |5 |5 |5
+// The user inserts the following data for partial update. Charactor `?` means 
that the cell is filled
+// with indicator value(currently we use null as indicator value).
+// row_num   k1|k2|v1|v2|v3
+// 1         1 |1 |10|10|10
+// 2         2 |2 |? |20|20
+// 3         3 |3 |30|30|?
+// 4         4 |4 |40|40|40
+// 5         5 |5 |50|? |50
+// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5]
+//       old_full_read_columns = [v4, v5], the old values from the previous 
rows will be read into these columns.
+//       old_point_read_columns = [k1, k2, v1, v2, v3], the old values from 
the previous rows will be read into these columns
+// if the correspoding columns in the input block has cell with indicator 
value.
+// Becase the column is immutable, filled_including_value_columns will store 
the data merged from
+// the original input block and old_point_read_columns. After the insertion, 
the data in the table will be:
+//  k1|k2|v1|v2|v3|v4|v5
+//  1 |1 |10|10|10|1 |1
+//  2 |2 |2 |20|20|2 |2
+//  3 |3 |30|30|3 |3 |3
+//  4 |4 |40|40|40|4 |4
+//  5 |5 |50|5 |50|5 |5
+Status SegmentWriter::fill_missing_columns(
+        vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan,
+        const std::vector<uint32_t>& cids_full_read, const 
std::vector<uint32_t>& cids_point_read,
+        const std::vector<bool>& use_default_or_null_flag, bool 
has_default_or_nullable,
+        const size_t& segment_start_pos, bool 
is_unique_key_replace_if_not_null) {
+    vectorized::MutableColumns full_columns = full_block->mutate_columns();
+    vectorized::Block old_full_read_block = 
_tablet_schema->create_missing_columns_block();
+    vectorized::MutableColumns old_full_read_columns = 
old_full_read_block.mutate_columns();
+    vectorized::Block old_point_read_block = 
_tablet_schema->create_block_by_cids(cids_point_read);
+    vectorized::MutableColumns old_point_read_columns = 
old_point_read_block.mutate_columns();

Review Comment:
   warning: variable 'old_point_read_columns' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       vectorized::MutableColumns old_point_read_columns = 0 = 
old_point_read_block.mutate_columns();
   ```
   



##########
be/src/olap/rowset/segment_v2/segment_writer.h:
##########
@@ -128,9 +130,15 @@ class SegmentWriter {
     void clear();
 
     void set_mow_context(std::shared_ptr<MowContext> mow_context);
-    Status fill_missing_columns(vectorized::MutableColumns& 
mutable_full_columns,
+    Status fill_missing_columns(vectorized::Block* full_block,
+                                const PartialUpdateReadPlan& read_plan,
+                                const std::vector<uint32_t>& cids_full_read,
+                                const std::vector<uint32_t>& cids_point_read,
                                 const std::vector<bool>& 
use_default_or_null_flag,
-                                bool has_default_or_nullable, const size_t& 
segment_start_pos);
+                                bool has_default_or_nullable, const size_t& 
segment_start_pos,
+                                bool is_unique_key_replace_if_not_null);
+
+    std::shared_ptr<IndicatorMaps> get_indicator_maps() const { return 
_indicator_maps; }

Review Comment:
   warning: function 'get_indicator_maps' should be marked [[nodiscard]] 
[modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] std::shared_ptr<IndicatorMaps> get_indicator_maps() const 
{ return _indicator_maps; }
   ```
   



##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -539,93 +603,150 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     return Status::OK();
 }
 
-Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& 
mutable_full_columns,
-                                           const std::vector<bool>& 
use_default_or_null_flag,
-                                           bool has_default_or_nullable,
-                                           const size_t& segment_start_pos) {
-    // create old value columns
-    auto old_value_block = _tablet_schema->create_missing_columns_block();
-    std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids();
-    CHECK(cids_missing.size() == old_value_block.columns());
-    auto mutable_old_columns = old_value_block.mutate_columns();
-    bool has_row_column = _tablet_schema->store_row_column();
-    // record real pos, key is input line num, value is old_block line num
-    std::map<uint32_t, uint32_t> read_index;
-    size_t read_idx = 0;
-    for (auto rs_it : _rssid_to_rid) {
-        for (auto seg_it : rs_it.second) {
-            auto rowset = _rsid_to_rowset[rs_it.first];
-            CHECK(rowset);
-            std::vector<uint32_t> rids;
-            for (auto id_and_pos : seg_it.second) {
-                rids.emplace_back(id_and_pos.rid);
-                read_index[id_and_pos.pos] = read_idx++;
-            }
-            if (has_row_column) {
-                auto st = _tablet->fetch_value_through_row_column(rowset, 
seg_it.first, rids,
-                                                                  
cids_missing, old_value_block);
-                if (!st.ok()) {
-                    LOG(WARNING) << "failed to fetch value through row column";
-                    return st;
-                }
-                continue;
-            }
-            for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
-                TabletColumn tablet_column = 
_tablet_schema->column(cids_missing[cid]);
-                auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, 
rids, tablet_column,
-                                                         
mutable_old_columns[cid]);
-                // set read value to output block
-                if (!st.ok()) {
-                    LOG(WARNING) << "failed to fetch value by rowids";
-                    return st;
-                }
+void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows,
+                                         const IndicatorMapsVertical& 
indicator_maps_vertical) {
+    _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); // 
fixme(baohan): ?
+    for (auto [cid, indicator_map] : indicator_maps_vertical) {
+        for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) {
+            if (indicator_map != nullptr && indicator_map[pos] != 0) {
+                (*_indicator_maps)[pos].emplace_back(cid);
             }
         }
     }
+}
+
+// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3, 
v4, v5] where k1, k2 are key columns
+// and v1, v2, v3, v4, v5 are value columns. The table has the following data:
+//  k1|k2|v1|v2|v3|v4|v5
+//  1 |1 |1 |1 |1 |1 |1
+//  2 |2 |2 |2 |2 |2 |2
+//  3 |3 |3 |3 |3 |3 |3
+//  4 |4 |4 |4 |4 |4 |4
+//  5 |5 |5 |5 |5 |5 |5
+// The user inserts the following data for partial update. Charactor `?` means 
that the cell is filled
+// with indicator value(currently we use null as indicator value).
+// row_num   k1|k2|v1|v2|v3
+// 1         1 |1 |10|10|10
+// 2         2 |2 |? |20|20
+// 3         3 |3 |30|30|?
+// 4         4 |4 |40|40|40
+// 5         5 |5 |50|? |50
+// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5]
+//       old_full_read_columns = [v4, v5], the old values from the previous 
rows will be read into these columns.
+//       old_point_read_columns = [k1, k2, v1, v2, v3], the old values from 
the previous rows will be read into these columns
+// if the correspoding columns in the input block has cell with indicator 
value.
+// Becase the column is immutable, filled_including_value_columns will store 
the data merged from
+// the original input block and old_point_read_columns. After the insertion, 
the data in the table will be:
+//  k1|k2|v1|v2|v3|v4|v5
+//  1 |1 |10|10|10|1 |1
+//  2 |2 |2 |20|20|2 |2
+//  3 |3 |30|30|3 |3 |3
+//  4 |4 |40|40|40|4 |4
+//  5 |5 |50|5 |50|5 |5
+Status SegmentWriter::fill_missing_columns(
+        vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan,
+        const std::vector<uint32_t>& cids_full_read, const 
std::vector<uint32_t>& cids_point_read,
+        const std::vector<bool>& use_default_or_null_flag, bool 
has_default_or_nullable,
+        const size_t& segment_start_pos, bool 
is_unique_key_replace_if_not_null) {
+    vectorized::MutableColumns full_columns = full_block->mutate_columns();
+    vectorized::Block old_full_read_block = 
_tablet_schema->create_missing_columns_block();
+    vectorized::MutableColumns old_full_read_columns = 
old_full_read_block.mutate_columns();
+    vectorized::Block old_point_read_block = 
_tablet_schema->create_block_by_cids(cids_point_read);
+    vectorized::MutableColumns old_point_read_columns = 
old_point_read_block.mutate_columns();
+    vectorized::MutableColumns filled_including_value_columns =
+            old_point_read_block.clone_empty_columns(); // used to hold data 
after being filled
+    // !!NOTE: columns in old_point_read_block may have different row nums!
+
+    // rowid in input block -> line num in old_full_read_block
+    std::map<uint32_t, uint32_t> missing_cols_read_index;
+    // partial update cid -> (rowid in input block -> line num in 
old_point_read_block)
+    std::map<uint32_t, std::map<uint32_t, uint32_t>> 
parital_update_cols_read_index;
+
+    if (is_unique_key_replace_if_not_null) {
+        RETURN_IF_ERROR(_tablet->read_columns_by_plan(
+                _tablet_schema, _rsid_to_rowset, read_plan, &cids_full_read, 
&cids_point_read,
+                &old_full_read_block, &old_point_read_block, 
&missing_cols_read_index,
+                &parital_update_cols_read_index));
+    } else {
+        RETURN_IF_ERROR(_tablet->read_columns_by_plan(_tablet_schema, 
_rsid_to_rowset, read_plan,
+                                                      &cids_full_read, 
&old_full_read_block,
+                                                      
&missing_cols_read_index));
+    }
+
     // build default value columns
-    auto default_value_block = old_value_block.clone_empty();
+    auto default_value_block = old_full_read_block.clone_empty();
     auto mutable_default_value_columns = default_value_block.mutate_columns();
     if (has_default_or_nullable) {
-        for (auto i = 0; i < cids_missing.size(); ++i) {
-            const auto& column = _tablet_schema->column(cids_missing[i]);
+        for (auto i = 0; i < cids_full_read.size(); ++i) {
+            const auto& column = _tablet_schema->column(cids_full_read[i]);
             if (column.has_default_value()) {
-                auto default_value = 
_tablet_schema->column(cids_missing[i]).default_value();
+                auto default_value = 
_tablet_schema->column(cids_full_read[i]).default_value();
                 vectorized::ReadBuffer 
rb(const_cast<char*>(default_value.c_str()),
                                           default_value.size());
-                old_value_block.get_by_position(i).type->from_string(
+                old_full_read_block.get_by_position(i).type->from_string(
                         rb, mutable_default_value_columns[i].get());
             }
         }
     }
 
-    // fill all missing value from mutable_old_columns, need to consider 
default value and null value
+    // fill all missing value from old_value_columns, need to consider default 
value and null value
     for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
         if (use_default_or_null_flag[idx]) {
-            for (auto i = 0; i < cids_missing.size(); ++i) {
+            for (auto i = 0; i < cids_full_read.size(); ++i) {
                 // if the column has default value, fiil it with default value
                 // otherwise, if the column is nullable, fill it with null 
value
-                const auto& tablet_column = 
_tablet_schema->column(cids_missing[i]);
+                const auto& tablet_column = 
_tablet_schema->column(cids_full_read[i]);
                 if (tablet_column.has_default_value()) {
-                    mutable_full_columns[cids_missing[i]]->insert_from(
+                    full_columns[cids_full_read[i]]->insert_from(
                             *mutable_default_value_columns[i].get(), 0);
                 } else if (tablet_column.is_nullable()) {
                     auto nullable_column = 
assert_cast<vectorized::ColumnNullable*>(
-                            mutable_full_columns[cids_missing[i]].get());
+                            full_columns[cids_full_read[i]].get());
                     nullable_column->insert_null_elements(1);
                 } else {
                     // If the control flow reaches this branch, the column 
neither has default value
                     // nor is nullable. It means that the row's delete sign is 
marked, and the value
                     // columns are useless and won't be read. So we can just 
put arbitary values in the cells
-                    mutable_full_columns[cids_missing[i]]->insert_default();
+                    full_columns[cids_full_read[i]]->insert_default();
                 }
             }
             continue;
         }
-        auto pos_in_old_block = read_index[idx + segment_start_pos];
-        for (auto i = 0; i < cids_missing.size(); ++i) {
-            mutable_full_columns[cids_missing[i]]->insert_from(
-                    
*old_value_block.get_columns_with_type_and_name()[i].column.get(),
-                    pos_in_old_block);
+
+        //TODO(bobhan1): fix me later(the wrong row pos)
+        // TODO(bobhan1): handle row store column here!!!
+        uint32_t pos_in_old_block = missing_cols_read_index[idx + 
segment_start_pos];
+        for (auto i = 0; i < cids_full_read.size(); ++i) {
+            uint32_t cid = cids_full_read[i];
+            if (full_block->get_by_position(cid).name != 
BeConsts::ROW_STORE_COL) {
+                full_columns[cid]->insert_from(
+                        
*old_full_read_block.get_columns_with_type_and_name()[i].column.get(),
+                        pos_in_old_block);
+            }
+        }
+
+        if (is_unique_key_replace_if_not_null) {
+            //TODO(bobhan1): fix me later(the wrong row pos)
+            for (size_t i = 0; i < cids_point_read.size(); i++) {
+                uint32_t cid = cids_point_read[i];
+                if (parital_update_cols_read_index[cid].contains(idx + 
segment_start_pos)) {
+                    // cells with indicator value should be replaced with old 
values in previous rows
+                    uint32_t pos_in_old_block =

Review Comment:
   warning: variable 'pos_in_old_block' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
                       uint32_t pos_in_old_block = 0 =
   ```
   



##########
be/src/vec/jsonb/serialize.cpp:
##########
@@ -109,4 +130,41 @@ void JsonbSerializeUtil::jsonb_to_block(const 
DataTypeSerDeSPtrs& serdes, const
     }
 }
 
+void JsonbSerializeUtil::jsonb_to_block(
+        const DataTypeSerDeSPtrs& serdes_full_read, const DataTypeSerDeSPtrs& 
serdes_point_read,
+        const char* data, size_t size,
+        const std::unordered_map<uint32_t, uint32_t>& col_uid_to_idx_full_read,
+        const std::unordered_map<uint32_t, std::pair<uint32_t, uint32_t>>&
+                col_uid_to_idx_cid_point_read,
+        std::unordered_map<uint32_t, bool>& point_read_cids, Block& 
block_full_read,
+        Block& block_point_read) {
+    JsonbDocument& doc = *JsonbDocument::createDocument(data, size);
+    size_t full_read_filled_columns = 0;
+    for (auto it = doc->begin(); it != doc->end(); ++it) {
+        auto col_it = col_uid_to_idx_full_read.find(it->getKeyId());
+        if (col_it != col_uid_to_idx_full_read.end()) {
+            MutableColumnPtr dst_column =
+                    
block_full_read.get_by_position(col_it->second).column->assume_mutable();
+            
serdes_full_read[col_it->second]->read_one_cell_from_jsonb(*dst_column, 
it->value());
+            ++full_read_filled_columns;
+        } else {
+            auto it2 = col_uid_to_idx_cid_point_read.find(it->getKeyId());
+            if (it2 != col_uid_to_idx_cid_point_read.end()) {
+                uint32_t cid = it2->second.second;
+                uint32_t idx = it2->second.first;
+                auto it3 = point_read_cids.find(cid);
+                if (it3 != point_read_cids.end()) {
+                    MutableColumnPtr dst_column =

Review Comment:
   warning: variable 'dst_column' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
                       MutableColumnPtr dst_column = 0 =
   ```
   



##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -332,25 +335,69 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     }
     DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && 
_opts.enable_unique_key_merge_on_write);
 
-    // find missing column cids
+    // all the cells in missing_cids will be filled from the old rows
     std::vector<uint32_t> missing_cids = _tablet_schema->get_missing_cids();
     std::vector<uint32_t> including_cids = _tablet_schema->get_update_cids();
+    std::vector<uint32_t> determistic_cids; // columns that dont't need read 
from old rows
+    std::vector<uint32_t> point_read_cids;
 
     // create full block and fill with input columns
     auto full_block = _tablet_schema->create_block();
-    size_t input_id = 0;
-    for (auto i : including_cids) {
-        full_block.replace_by_position(i, 
block->get_by_position(input_id++).column);
+
+    // indictors to apply partial updates for including columns,
+    // cells with `indicator` values should be read from the old rows
+    // currently we use treat null value as `indicator`
+    IndicatorMapsVertical indicator_maps_vertical; // cid -> rows
+
+    PartialUpdateReadPlan read_plan;
+    if (_tablet_schema->store_row_column()) {
+        read_plan = RowStoreReadPlan {};
+    }
+
+    bool is_unique_key_replace_if_not_null = 
_tablet_schema->is_unique_key_replace_if_not_null();

Review Comment:
   warning: variable 'is_unique_key_replace_if_not_null' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       bool is_unique_key_replace_if_not_null = false = 
_tablet_schema->is_unique_key_replace_if_not_null();
   ```
   



##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -539,93 +603,150 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     return Status::OK();
 }
 
-Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& 
mutable_full_columns,
-                                           const std::vector<bool>& 
use_default_or_null_flag,
-                                           bool has_default_or_nullable,
-                                           const size_t& segment_start_pos) {
-    // create old value columns
-    auto old_value_block = _tablet_schema->create_missing_columns_block();
-    std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids();
-    CHECK(cids_missing.size() == old_value_block.columns());
-    auto mutable_old_columns = old_value_block.mutate_columns();
-    bool has_row_column = _tablet_schema->store_row_column();
-    // record real pos, key is input line num, value is old_block line num
-    std::map<uint32_t, uint32_t> read_index;
-    size_t read_idx = 0;
-    for (auto rs_it : _rssid_to_rid) {
-        for (auto seg_it : rs_it.second) {
-            auto rowset = _rsid_to_rowset[rs_it.first];
-            CHECK(rowset);
-            std::vector<uint32_t> rids;
-            for (auto id_and_pos : seg_it.second) {
-                rids.emplace_back(id_and_pos.rid);
-                read_index[id_and_pos.pos] = read_idx++;
-            }
-            if (has_row_column) {
-                auto st = _tablet->fetch_value_through_row_column(rowset, 
seg_it.first, rids,
-                                                                  
cids_missing, old_value_block);
-                if (!st.ok()) {
-                    LOG(WARNING) << "failed to fetch value through row column";
-                    return st;
-                }
-                continue;
-            }
-            for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
-                TabletColumn tablet_column = 
_tablet_schema->column(cids_missing[cid]);
-                auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, 
rids, tablet_column,
-                                                         
mutable_old_columns[cid]);
-                // set read value to output block
-                if (!st.ok()) {
-                    LOG(WARNING) << "failed to fetch value by rowids";
-                    return st;
-                }
+void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows,
+                                         const IndicatorMapsVertical& 
indicator_maps_vertical) {
+    _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); // 
fixme(baohan): ?
+    for (auto [cid, indicator_map] : indicator_maps_vertical) {
+        for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) {
+            if (indicator_map != nullptr && indicator_map[pos] != 0) {
+                (*_indicator_maps)[pos].emplace_back(cid);
             }
         }
     }
+}
+
+// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3, 
v4, v5] where k1, k2 are key columns
+// and v1, v2, v3, v4, v5 are value columns. The table has the following data:
+//  k1|k2|v1|v2|v3|v4|v5
+//  1 |1 |1 |1 |1 |1 |1
+//  2 |2 |2 |2 |2 |2 |2
+//  3 |3 |3 |3 |3 |3 |3
+//  4 |4 |4 |4 |4 |4 |4
+//  5 |5 |5 |5 |5 |5 |5
+// The user inserts the following data for partial update. Charactor `?` means 
that the cell is filled
+// with indicator value(currently we use null as indicator value).
+// row_num   k1|k2|v1|v2|v3
+// 1         1 |1 |10|10|10
+// 2         2 |2 |? |20|20
+// 3         3 |3 |30|30|?
+// 4         4 |4 |40|40|40
+// 5         5 |5 |50|? |50
+// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5]
+//       old_full_read_columns = [v4, v5], the old values from the previous 
rows will be read into these columns.
+//       old_point_read_columns = [k1, k2, v1, v2, v3], the old values from 
the previous rows will be read into these columns
+// if the correspoding columns in the input block has cell with indicator 
value.
+// Becase the column is immutable, filled_including_value_columns will store 
the data merged from
+// the original input block and old_point_read_columns. After the insertion, 
the data in the table will be:
+//  k1|k2|v1|v2|v3|v4|v5
+//  1 |1 |10|10|10|1 |1
+//  2 |2 |2 |20|20|2 |2
+//  3 |3 |30|30|3 |3 |3
+//  4 |4 |40|40|40|4 |4
+//  5 |5 |50|5 |50|5 |5
+Status SegmentWriter::fill_missing_columns(
+        vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan,
+        const std::vector<uint32_t>& cids_full_read, const 
std::vector<uint32_t>& cids_point_read,
+        const std::vector<bool>& use_default_or_null_flag, bool 
has_default_or_nullable,
+        const size_t& segment_start_pos, bool 
is_unique_key_replace_if_not_null) {
+    vectorized::MutableColumns full_columns = full_block->mutate_columns();
+    vectorized::Block old_full_read_block = 
_tablet_schema->create_missing_columns_block();
+    vectorized::MutableColumns old_full_read_columns = 
old_full_read_block.mutate_columns();
+    vectorized::Block old_point_read_block = 
_tablet_schema->create_block_by_cids(cids_point_read);
+    vectorized::MutableColumns old_point_read_columns = 
old_point_read_block.mutate_columns();
+    vectorized::MutableColumns filled_including_value_columns =

Review Comment:
   warning: variable 'filled_including_value_columns' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       vectorized::MutableColumns filled_including_value_columns = 0 =
   ```
   



##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -332,25 +335,69 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     }
     DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && 
_opts.enable_unique_key_merge_on_write);
 
-    // find missing column cids
+    // all the cells in missing_cids will be filled from the old rows
     std::vector<uint32_t> missing_cids = _tablet_schema->get_missing_cids();
     std::vector<uint32_t> including_cids = _tablet_schema->get_update_cids();
+    std::vector<uint32_t> determistic_cids; // columns that dont't need read 
from old rows
+    std::vector<uint32_t> point_read_cids;
 
     // create full block and fill with input columns
     auto full_block = _tablet_schema->create_block();
-    size_t input_id = 0;
-    for (auto i : including_cids) {
-        full_block.replace_by_position(i, 
block->get_by_position(input_id++).column);
+
+    // indictors to apply partial updates for including columns,
+    // cells with `indicator` values should be read from the old rows
+    // currently we use treat null value as `indicator`
+    IndicatorMapsVertical indicator_maps_vertical; // cid -> rows
+
+    PartialUpdateReadPlan read_plan;

Review Comment:
   warning: variable 'read_plan' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       PartialUpdateReadPlan read_plan = 0;
   ```
   



##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -332,25 +335,69 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     }
     DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && 
_opts.enable_unique_key_merge_on_write);
 
-    // find missing column cids
+    // all the cells in missing_cids will be filled from the old rows
     std::vector<uint32_t> missing_cids = _tablet_schema->get_missing_cids();
     std::vector<uint32_t> including_cids = _tablet_schema->get_update_cids();
+    std::vector<uint32_t> determistic_cids; // columns that dont't need read 
from old rows
+    std::vector<uint32_t> point_read_cids;
 
     // create full block and fill with input columns
     auto full_block = _tablet_schema->create_block();
-    size_t input_id = 0;
-    for (auto i : including_cids) {
-        full_block.replace_by_position(i, 
block->get_by_position(input_id++).column);
+
+    // indictors to apply partial updates for including columns,
+    // cells with `indicator` values should be read from the old rows
+    // currently we use treat null value as `indicator`
+    IndicatorMapsVertical indicator_maps_vertical; // cid -> rows

Review Comment:
   warning: variable 'indicator_maps_vertical' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       IndicatorMapsVertical indicator_maps_vertical = 0; // cid -> rows
   ```
   



##########
be/src/olap/rowset/beta_rowset_writer.h:
##########
@@ -102,6 +102,11 @@ class BetaRowsetWriter : public RowsetWriter {
 
     int64_t num_rows_filtered() const override { return 
_segment_creator.num_rows_filtered(); }
 
+    // currently, a rowset cantains at most one segment, so we just return the 
segment's indicator maps
+    std::shared_ptr<IndicatorMaps> get_indicator_maps() const override {

Review Comment:
   warning: function 'get_indicator_maps' should be marked [[nodiscard]] 
[modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] std::shared_ptr<IndicatorMaps> get_indicator_maps() const 
override {
   ```
   



##########
be/src/olap/rowset/rowset_writer.h:
##########
@@ -131,6 +131,8 @@ class RowsetWriter {
 
     virtual int64_t num_rows_filtered() const = 0;
 
+    virtual std::shared_ptr<IndicatorMaps> get_indicator_maps() const = 0;

Review Comment:
   warning: function 'get_indicator_maps' should be marked [[nodiscard]] 
[modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] virtual std::shared_ptr<IndicatorMaps> 
get_indicator_maps() const = 0;
   ```
   



##########
be/src/olap/rowset/segment_creator.h:
##########
@@ -166,6 +170,10 @@ class SegmentCreator {
 
     int64_t num_rows_filtered() const { return 
_segment_flusher.num_rows_filtered(); }
 
+    std::shared_ptr<IndicatorMaps> get_indicator_maps() const {

Review Comment:
   warning: function 'get_indicator_maps' should be marked [[nodiscard]] 
[modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] std::shared_ptr<IndicatorMaps> get_indicator_maps() const {
   ```
   



##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -539,93 +603,150 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     return Status::OK();
 }
 
-Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& 
mutable_full_columns,
-                                           const std::vector<bool>& 
use_default_or_null_flag,
-                                           bool has_default_or_nullable,
-                                           const size_t& segment_start_pos) {
-    // create old value columns
-    auto old_value_block = _tablet_schema->create_missing_columns_block();
-    std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids();
-    CHECK(cids_missing.size() == old_value_block.columns());
-    auto mutable_old_columns = old_value_block.mutate_columns();
-    bool has_row_column = _tablet_schema->store_row_column();
-    // record real pos, key is input line num, value is old_block line num
-    std::map<uint32_t, uint32_t> read_index;
-    size_t read_idx = 0;
-    for (auto rs_it : _rssid_to_rid) {
-        for (auto seg_it : rs_it.second) {
-            auto rowset = _rsid_to_rowset[rs_it.first];
-            CHECK(rowset);
-            std::vector<uint32_t> rids;
-            for (auto id_and_pos : seg_it.second) {
-                rids.emplace_back(id_and_pos.rid);
-                read_index[id_and_pos.pos] = read_idx++;
-            }
-            if (has_row_column) {
-                auto st = _tablet->fetch_value_through_row_column(rowset, 
seg_it.first, rids,
-                                                                  
cids_missing, old_value_block);
-                if (!st.ok()) {
-                    LOG(WARNING) << "failed to fetch value through row column";
-                    return st;
-                }
-                continue;
-            }
-            for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
-                TabletColumn tablet_column = 
_tablet_schema->column(cids_missing[cid]);
-                auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, 
rids, tablet_column,
-                                                         
mutable_old_columns[cid]);
-                // set read value to output block
-                if (!st.ok()) {
-                    LOG(WARNING) << "failed to fetch value by rowids";
-                    return st;
-                }
+void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows,
+                                         const IndicatorMapsVertical& 
indicator_maps_vertical) {
+    _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); // 
fixme(baohan): ?
+    for (auto [cid, indicator_map] : indicator_maps_vertical) {
+        for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) {
+            if (indicator_map != nullptr && indicator_map[pos] != 0) {
+                (*_indicator_maps)[pos].emplace_back(cid);
             }
         }
     }
+}
+
+// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3, 
v4, v5] where k1, k2 are key columns
+// and v1, v2, v3, v4, v5 are value columns. The table has the following data:
+//  k1|k2|v1|v2|v3|v4|v5
+//  1 |1 |1 |1 |1 |1 |1
+//  2 |2 |2 |2 |2 |2 |2
+//  3 |3 |3 |3 |3 |3 |3
+//  4 |4 |4 |4 |4 |4 |4
+//  5 |5 |5 |5 |5 |5 |5
+// The user inserts the following data for partial update. Charactor `?` means 
that the cell is filled
+// with indicator value(currently we use null as indicator value).
+// row_num   k1|k2|v1|v2|v3
+// 1         1 |1 |10|10|10
+// 2         2 |2 |? |20|20
+// 3         3 |3 |30|30|?
+// 4         4 |4 |40|40|40
+// 5         5 |5 |50|? |50
+// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5]
+//       old_full_read_columns = [v4, v5], the old values from the previous 
rows will be read into these columns.
+//       old_point_read_columns = [k1, k2, v1, v2, v3], the old values from 
the previous rows will be read into these columns
+// if the correspoding columns in the input block has cell with indicator 
value.
+// Becase the column is immutable, filled_including_value_columns will store 
the data merged from
+// the original input block and old_point_read_columns. After the insertion, 
the data in the table will be:
+//  k1|k2|v1|v2|v3|v4|v5
+//  1 |1 |10|10|10|1 |1
+//  2 |2 |2 |20|20|2 |2
+//  3 |3 |30|30|3 |3 |3
+//  4 |4 |40|40|40|4 |4
+//  5 |5 |50|5 |50|5 |5
+Status SegmentWriter::fill_missing_columns(
+        vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan,
+        const std::vector<uint32_t>& cids_full_read, const 
std::vector<uint32_t>& cids_point_read,
+        const std::vector<bool>& use_default_or_null_flag, bool 
has_default_or_nullable,
+        const size_t& segment_start_pos, bool 
is_unique_key_replace_if_not_null) {
+    vectorized::MutableColumns full_columns = full_block->mutate_columns();

Review Comment:
   warning: variable 'full_columns' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
       vectorized::MutableColumns full_columns = 0 = 
full_block->mutate_columns();
   ```
   



##########
be/src/olap/tablet.cpp:
##########
@@ -2687,20 +2688,40 @@ Status 
Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint
                                                  &column_iterator, &stats));
     // get and parse tuple row
     vectorized::MutableColumnPtr column_ptr = 
vectorized::ColumnString::create();
-    RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), 
rowids.size(), column_ptr));
-    assert(column_ptr->size() == rowids.size());
+    RETURN_IF_ERROR(column_iterator->read_by_rowids(rids.data(), rids.size(), 
column_ptr));
+    assert(column_ptr->size() == rids.size());
     auto string_column = 
static_cast<vectorized::ColumnString*>(column_ptr.get());
-    vectorized::DataTypeSerDeSPtrs serdes;
-    serdes.resize(cids.size());
-    std::unordered_map<uint32_t, uint32_t> col_uid_to_idx;
-    for (int i = 0; i < cids.size(); ++i) {
-        const TabletColumn& column = tablet_schema->column(cids[i]);
+    vectorized::DataTypeSerDeSPtrs serdes_full_read;
+    serdes_full_read.resize(cids_full_read->size());

Review Comment:
   warning: variable 'serdes_full_read' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
   ypeSerDeSPtrs serdes_full_read = 0;
   ```
   



##########
be/src/olap/tablet_schema.h:
##########
@@ -363,8 +364,12 @@ class TabletSchema {
     }
     void set_is_strict_mode(bool is_strict_mode) { _is_strict_mode = 
is_strict_mode; }
     bool is_strict_mode() const { return _is_strict_mode; }
-    std::vector<uint32_t> get_missing_cids() const { return _missing_cids; }
-    std::vector<uint32_t> get_update_cids() const { return _update_cids; }
+    void set_is_unique_key_replace_if_not_null(bool 
is_unique_key_replace_if_not_null) {
+        _is_unique_key_replace_if_not_null = is_unique_key_replace_if_not_null;
+    }
+    bool is_unique_key_replace_if_not_null() const { return 
_is_unique_key_replace_if_not_null; }

Review Comment:
   warning: function 'is_unique_key_replace_if_not_null' should be marked 
[[nodiscard]] [modernize-use-nodiscard]
   
   ```suggestion
       [[nodiscard]] bool is_unique_key_replace_if_not_null() const { return 
_is_unique_key_replace_if_not_null; }
   ```
   



##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -539,93 +603,150 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     return Status::OK();
 }
 
-Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& 
mutable_full_columns,
-                                           const std::vector<bool>& 
use_default_or_null_flag,
-                                           bool has_default_or_nullable,
-                                           const size_t& segment_start_pos) {
-    // create old value columns
-    auto old_value_block = _tablet_schema->create_missing_columns_block();
-    std::vector<uint32_t> cids_missing = _tablet_schema->get_missing_cids();
-    CHECK(cids_missing.size() == old_value_block.columns());
-    auto mutable_old_columns = old_value_block.mutate_columns();
-    bool has_row_column = _tablet_schema->store_row_column();
-    // record real pos, key is input line num, value is old_block line num
-    std::map<uint32_t, uint32_t> read_index;
-    size_t read_idx = 0;
-    for (auto rs_it : _rssid_to_rid) {
-        for (auto seg_it : rs_it.second) {
-            auto rowset = _rsid_to_rowset[rs_it.first];
-            CHECK(rowset);
-            std::vector<uint32_t> rids;
-            for (auto id_and_pos : seg_it.second) {
-                rids.emplace_back(id_and_pos.rid);
-                read_index[id_and_pos.pos] = read_idx++;
-            }
-            if (has_row_column) {
-                auto st = _tablet->fetch_value_through_row_column(rowset, 
seg_it.first, rids,
-                                                                  
cids_missing, old_value_block);
-                if (!st.ok()) {
-                    LOG(WARNING) << "failed to fetch value through row column";
-                    return st;
-                }
-                continue;
-            }
-            for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
-                TabletColumn tablet_column = 
_tablet_schema->column(cids_missing[cid]);
-                auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, 
rids, tablet_column,
-                                                         
mutable_old_columns[cid]);
-                // set read value to output block
-                if (!st.ok()) {
-                    LOG(WARNING) << "failed to fetch value by rowids";
-                    return st;
-                }
+void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows,
+                                         const IndicatorMapsVertical& 
indicator_maps_vertical) {
+    _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); // 
fixme(baohan): ?
+    for (auto [cid, indicator_map] : indicator_maps_vertical) {
+        for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) {
+            if (indicator_map != nullptr && indicator_map[pos] != 0) {
+                (*_indicator_maps)[pos].emplace_back(cid);
             }
         }
     }
+}
+
+// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3, 
v4, v5] where k1, k2 are key columns
+// and v1, v2, v3, v4, v5 are value columns. The table has the following data:
+//  k1|k2|v1|v2|v3|v4|v5
+//  1 |1 |1 |1 |1 |1 |1
+//  2 |2 |2 |2 |2 |2 |2
+//  3 |3 |3 |3 |3 |3 |3
+//  4 |4 |4 |4 |4 |4 |4
+//  5 |5 |5 |5 |5 |5 |5
+// The user inserts the following data for partial update. Charactor `?` means 
that the cell is filled
+// with indicator value(currently we use null as indicator value).
+// row_num   k1|k2|v1|v2|v3
+// 1         1 |1 |10|10|10
+// 2         2 |2 |? |20|20
+// 3         3 |3 |30|30|?
+// 4         4 |4 |40|40|40
+// 5         5 |5 |50|? |50
+// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5]
+//       old_full_read_columns = [v4, v5], the old values from the previous 
rows will be read into these columns.
+//       old_point_read_columns = [k1, k2, v1, v2, v3], the old values from 
the previous rows will be read into these columns
+// if the correspoding columns in the input block has cell with indicator 
value.
+// Becase the column is immutable, filled_including_value_columns will store 
the data merged from
+// the original input block and old_point_read_columns. After the insertion, 
the data in the table will be:
+//  k1|k2|v1|v2|v3|v4|v5
+//  1 |1 |10|10|10|1 |1
+//  2 |2 |2 |20|20|2 |2
+//  3 |3 |30|30|3 |3 |3
+//  4 |4 |40|40|40|4 |4
+//  5 |5 |50|5 |50|5 |5
+Status SegmentWriter::fill_missing_columns(
+        vectorized::Block* full_block, const PartialUpdateReadPlan& read_plan,
+        const std::vector<uint32_t>& cids_full_read, const 
std::vector<uint32_t>& cids_point_read,
+        const std::vector<bool>& use_default_or_null_flag, bool 
has_default_or_nullable,
+        const size_t& segment_start_pos, bool 
is_unique_key_replace_if_not_null) {
+    vectorized::MutableColumns full_columns = full_block->mutate_columns();
+    vectorized::Block old_full_read_block = 
_tablet_schema->create_missing_columns_block();
+    vectorized::MutableColumns old_full_read_columns = 
old_full_read_block.mutate_columns();
+    vectorized::Block old_point_read_block = 
_tablet_schema->create_block_by_cids(cids_point_read);
+    vectorized::MutableColumns old_point_read_columns = 
old_point_read_block.mutate_columns();
+    vectorized::MutableColumns filled_including_value_columns =
+            old_point_read_block.clone_empty_columns(); // used to hold data 
after being filled
+    // !!NOTE: columns in old_point_read_block may have different row nums!
+
+    // rowid in input block -> line num in old_full_read_block
+    std::map<uint32_t, uint32_t> missing_cols_read_index;
+    // partial update cid -> (rowid in input block -> line num in 
old_point_read_block)
+    std::map<uint32_t, std::map<uint32_t, uint32_t>> 
parital_update_cols_read_index;
+
+    if (is_unique_key_replace_if_not_null) {
+        RETURN_IF_ERROR(_tablet->read_columns_by_plan(
+                _tablet_schema, _rsid_to_rowset, read_plan, &cids_full_read, 
&cids_point_read,
+                &old_full_read_block, &old_point_read_block, 
&missing_cols_read_index,
+                &parital_update_cols_read_index));
+    } else {
+        RETURN_IF_ERROR(_tablet->read_columns_by_plan(_tablet_schema, 
_rsid_to_rowset, read_plan,
+                                                      &cids_full_read, 
&old_full_read_block,
+                                                      
&missing_cols_read_index));
+    }
+
     // build default value columns
-    auto default_value_block = old_value_block.clone_empty();
+    auto default_value_block = old_full_read_block.clone_empty();
     auto mutable_default_value_columns = default_value_block.mutate_columns();
     if (has_default_or_nullable) {
-        for (auto i = 0; i < cids_missing.size(); ++i) {
-            const auto& column = _tablet_schema->column(cids_missing[i]);
+        for (auto i = 0; i < cids_full_read.size(); ++i) {
+            const auto& column = _tablet_schema->column(cids_full_read[i]);
             if (column.has_default_value()) {
-                auto default_value = 
_tablet_schema->column(cids_missing[i]).default_value();
+                auto default_value = 
_tablet_schema->column(cids_full_read[i]).default_value();
                 vectorized::ReadBuffer 
rb(const_cast<char*>(default_value.c_str()),
                                           default_value.size());
-                old_value_block.get_by_position(i).type->from_string(
+                old_full_read_block.get_by_position(i).type->from_string(
                         rb, mutable_default_value_columns[i].get());
             }
         }
     }
 
-    // fill all missing value from mutable_old_columns, need to consider 
default value and null value
+    // fill all missing value from old_value_columns, need to consider default 
value and null value
     for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
         if (use_default_or_null_flag[idx]) {
-            for (auto i = 0; i < cids_missing.size(); ++i) {
+            for (auto i = 0; i < cids_full_read.size(); ++i) {
                 // if the column has default value, fiil it with default value
                 // otherwise, if the column is nullable, fill it with null 
value
-                const auto& tablet_column = 
_tablet_schema->column(cids_missing[i]);
+                const auto& tablet_column = 
_tablet_schema->column(cids_full_read[i]);
                 if (tablet_column.has_default_value()) {
-                    mutable_full_columns[cids_missing[i]]->insert_from(
+                    full_columns[cids_full_read[i]]->insert_from(
                             *mutable_default_value_columns[i].get(), 0);
                 } else if (tablet_column.is_nullable()) {
                     auto nullable_column = 
assert_cast<vectorized::ColumnNullable*>(
-                            mutable_full_columns[cids_missing[i]].get());
+                            full_columns[cids_full_read[i]].get());
                     nullable_column->insert_null_elements(1);
                 } else {
                     // If the control flow reaches this branch, the column 
neither has default value
                     // nor is nullable. It means that the row's delete sign is 
marked, and the value
                     // columns are useless and won't be read. So we can just 
put arbitary values in the cells
-                    mutable_full_columns[cids_missing[i]]->insert_default();
+                    full_columns[cids_full_read[i]]->insert_default();
                 }
             }
             continue;
         }
-        auto pos_in_old_block = read_index[idx + segment_start_pos];
-        for (auto i = 0; i < cids_missing.size(); ++i) {
-            mutable_full_columns[cids_missing[i]]->insert_from(
-                    
*old_value_block.get_columns_with_type_and_name()[i].column.get(),
-                    pos_in_old_block);
+
+        //TODO(bobhan1): fix me later(the wrong row pos)
+        // TODO(bobhan1): handle row store column here!!!
+        uint32_t pos_in_old_block = missing_cols_read_index[idx + 
segment_start_pos];

Review Comment:
   warning: variable 'pos_in_old_block' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
           uint32_t pos_in_old_block = 0 = missing_cols_read_index[idx + 
segment_start_pos];
   ```
   



##########
be/src/olap/tablet.cpp:
##########
@@ -2687,20 +2688,40 @@ Status 
Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint
                                                  &column_iterator, &stats));
     // get and parse tuple row
     vectorized::MutableColumnPtr column_ptr = 
vectorized::ColumnString::create();
-    RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), 
rowids.size(), column_ptr));
-    assert(column_ptr->size() == rowids.size());
+    RETURN_IF_ERROR(column_iterator->read_by_rowids(rids.data(), rids.size(), 
column_ptr));
+    assert(column_ptr->size() == rids.size());
     auto string_column = 
static_cast<vectorized::ColumnString*>(column_ptr.get());
-    vectorized::DataTypeSerDeSPtrs serdes;
-    serdes.resize(cids.size());
-    std::unordered_map<uint32_t, uint32_t> col_uid_to_idx;
-    for (int i = 0; i < cids.size(); ++i) {
-        const TabletColumn& column = tablet_schema->column(cids[i]);
+    vectorized::DataTypeSerDeSPtrs serdes_full_read;
+    serdes_full_read.resize(cids_full_read->size());
+
+    std::unordered_map<uint32_t, uint32_t> col_uid_to_idx_full_read;
+
+    for (int i = 0; i < cids_full_read->size(); ++i) {
+        const TabletColumn& column = 
tablet_schema->column(cids_full_read->at(i));
         vectorized::DataTypePtr type =
                 
vectorized::DataTypeFactory::instance().create_data_type(column);
-        col_uid_to_idx[column.unique_id()] = i;
-        serdes[i] = type->get_serde();
+        col_uid_to_idx_full_read[column.unique_id()] = i;
+        serdes_full_read[i] = type->get_serde();
+    }
+
+    if (with_point_read) {
+        vectorized::DataTypeSerDeSPtrs serdes_point_read;
+        serdes_point_read.resize(cids_point_read->size());

Review Comment:
   warning: variable 'serdes_point_read' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
   peSerDeSPtrs serdes_point_read = 0;
   ```
   



##########
be/src/olap/tablet.cpp:
##########
@@ -3084,110 +3118,278 @@ Status Tablet::generate_new_block_for_partial_update(
         TabletSchemaSPtr rowset_schema, const PartialUpdateReadPlan& 
read_plan_ori,
         const PartialUpdateReadPlan& read_plan_update,
         const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
+        std::shared_ptr<std::map<uint32_t, std::vector<uint32_t>>> 
indicator_maps,
         vectorized::Block* output_block) {
     // do partial update related works
     // 1. read columns by read plan
     // 2. generate new block
     // 3. write a new segment and modify rowset meta
     // 4. mark current keys deleted
     CHECK(output_block);
-    auto full_mutable_columns = output_block->mutate_columns();
-    auto old_block = rowset_schema->create_missing_columns_block();
-    auto missing_cids = rowset_schema->get_missing_cids();
-    auto update_block = rowset_schema->create_update_columns_block();
-    auto update_cids = rowset_schema->get_update_cids();
-
-    std::map<uint32_t, uint32_t> read_index_old;
-    RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids, 
read_plan_ori, rsid_to_rowset,
-                                         old_block, &read_index_old));
-
-    std::map<uint32_t, uint32_t> read_index_update;
-    RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids, 
read_plan_update,
-                                         rsid_to_rowset, update_block, 
&read_index_update));
+    if (!indicator_maps) {
+        auto full_mutable_columns = output_block->mutate_columns();
+        auto old_block = rowset_schema->create_missing_columns_block();
+        auto missing_cids = rowset_schema->get_missing_cids();
+        auto update_block = rowset_schema->create_update_columns_block();
+        auto update_cids = rowset_schema->get_update_cids();
+
+        std::map<uint32_t, uint32_t> read_index_old;
+        RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, 
read_plan_ori,
+                                             &missing_cids, &old_block, 
&read_index_old));
+
+        std::map<uint32_t, uint32_t> read_index_update;
+        RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, 
read_plan_update,
+                                             &update_cids, &update_block, 
&read_index_update));
+
+        // build full block
+        CHECK(read_index_old.size() == read_index_update.size());
+        for (auto i = 0; i < missing_cids.size(); ++i) {
+            for (auto idx = 0; idx < read_index_old.size(); ++idx) {
+                full_mutable_columns[missing_cids[i]]->insert_from(
+                        
*old_block.get_columns_with_type_and_name()[i].column.get(),
+                        read_index_old[idx]);
+            }
+        }
+        for (auto i = 0; i < update_cids.size(); ++i) {
+            for (auto idx = 0; idx < read_index_update.size(); ++idx) {
+                full_mutable_columns[update_cids[i]]->insert_from(
+                        
*update_block.get_columns_with_type_and_name()[i].column.get(),
+                        read_index_update[idx]);
+            }
+        }
+        VLOG_DEBUG << "full block when publish: " << output_block->dump_data();
+    } else {
+        std::unordered_set<uint32_t> cids_point_read;
+        for (const auto& [_, cids] : *indicator_maps) {

Review Comment:
   warning: variable 'cids_point_read' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
   _set<uint32_t> cids_point_read = 0;
   ```
   



##########
be/src/vec/jsonb/serialize.cpp:
##########
@@ -109,4 +130,41 @@ void JsonbSerializeUtil::jsonb_to_block(const 
DataTypeSerDeSPtrs& serdes, const
     }
 }
 
+void JsonbSerializeUtil::jsonb_to_block(
+        const DataTypeSerDeSPtrs& serdes_full_read, const DataTypeSerDeSPtrs& 
serdes_point_read,
+        const char* data, size_t size,
+        const std::unordered_map<uint32_t, uint32_t>& col_uid_to_idx_full_read,
+        const std::unordered_map<uint32_t, std::pair<uint32_t, uint32_t>>&
+                col_uid_to_idx_cid_point_read,
+        std::unordered_map<uint32_t, bool>& point_read_cids, Block& 
block_full_read,
+        Block& block_point_read) {
+    JsonbDocument& doc = *JsonbDocument::createDocument(data, size);
+    size_t full_read_filled_columns = 0;
+    for (auto it = doc->begin(); it != doc->end(); ++it) {
+        auto col_it = col_uid_to_idx_full_read.find(it->getKeyId());
+        if (col_it != col_uid_to_idx_full_read.end()) {
+            MutableColumnPtr dst_column =

Review Comment:
   warning: variable 'dst_column' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
               MutableColumnPtr dst_column = 0 =
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to