dataroaring commented on code in PR #41701:
URL: https://github.com/apache/doris/pull/41701#discussion_r1818887847


##########
be/src/olap/partial_update_info.cpp:
##########
@@ -658,4 +664,422 @@ Status 
FlexibleReadPlan::fill_non_primary_key_columns_for_row_store(
     return Status::OK();
 }
 
+BlockAggregator::BlockAggregator(segment_v2::VerticalSegmentWriter& 
vertical_segment_writer)
+        : _writer(vertical_segment_writer), 
_tablet_schema(*_writer._tablet_schema) {}
+
+void BlockAggregator::merge_one_row(vectorized::MutableBlock& dst_block,
+                                    vectorized::Block* src_block, int64_t rid,
+                                    BitmapValue& skip_bitmap) {
+    for (size_t cid {_tablet_schema.num_key_columns()}; cid < 
_tablet_schema.num_columns(); cid++) {
+        if (cid == _tablet_schema.skip_bitmap_col_idx()) {
+            auto& cur_skip_bitmap =
+                    
assert_cast<vectorized::ColumnBitmap*>(dst_block.mutable_columns()[cid].get())
+                            ->get_data()
+                            .back();
+            const auto& new_row_skip_bitmap =
+                    assert_cast<vectorized::ColumnBitmap*>(
+                            
src_block->get_by_position(cid).column->assume_mutable().get())
+                            ->get_data()[rid];
+            cur_skip_bitmap &= new_row_skip_bitmap;
+            continue;
+        }
+        if (!skip_bitmap.contains(_tablet_schema.column(cid).unique_id())) {
+            dst_block.mutable_columns()[cid]->pop_back(1);
+            
dst_block.mutable_columns()[cid]->insert_from(*src_block->get_by_position(cid).column,
+                                                          rid);
+        }
+    }
+    VLOG_DEBUG << fmt::format("merge a row, after merge, 
output_block.rows()={}, state: {}",
+                              dst_block.rows(), _state.to_string());
+}
+
+void BlockAggregator::append_one_row(vectorized::MutableBlock& dst_block,
+                                     vectorized::Block* src_block, int64_t 
rid) {
+    dst_block.add_row(src_block, rid);
+    _state.rows++;
+    VLOG_DEBUG << fmt::format("append a new row, after append, 
output_block.rows()={}, state: {}",
+                              dst_block.rows(), _state.to_string());
+}
+
+void BlockAggregator::remove_last_n_rows(vectorized::MutableBlock& dst_block, 
int n) {
+    if (n > 0) {
+        for (size_t cid {0}; cid < _tablet_schema.num_columns(); cid++) {
+            DCHECK_GE(dst_block.mutable_columns()[cid]->size(), n);
+            dst_block.mutable_columns()[cid]->pop_back(n);
+        }
+    }
+}
+
+void BlockAggregator::append_or_merge_row(vectorized::MutableBlock& dst_block,
+                                          vectorized::Block* src_block, 
int64_t rid,
+                                          BitmapValue& skip_bitmap, bool 
have_delete_sign) {
+    if (have_delete_sign) {
+        // remove all the previous batched rows
+        remove_last_n_rows(dst_block, _state.rows);
+        _state.rows = 0;
+        _state.has_row_with_delete_sign = true;
+
+        append_one_row(dst_block, src_block, rid);
+    } else {
+        if (_state.should_merge()) {
+            merge_one_row(dst_block, src_block, rid, skip_bitmap);
+        } else {
+            append_one_row(dst_block, src_block, rid);
+        }
+    }
+};
+
+Status BlockAggregator::aggregate_rows(
+        vectorized::MutableBlock& output_block, vectorized::Block* block, 
int64_t start,
+        int64_t end, std::string key, std::vector<BitmapValue>* skip_bitmaps,
+        const signed char* delete_signs, vectorized::IOlapColumnDataAccessor* 
seq_column,
+        const std::vector<RowsetSharedPtr>& specified_rowsets,
+        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
+    VLOG_DEBUG << fmt::format("merge rows in range=[{}-{})", start, end);
+    if (end - start == 1) {
+        output_block.add_row(block, start);
+        VLOG_DEBUG << fmt::format("append a row directly, rid={}", start);
+        return Status::OK();
+    }
+
+    auto seq_col_unique_id = 
_tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id();
+    auto delete_sign_col_unique_id =
+            
_tablet_schema.column(_tablet_schema.delete_sign_idx()).unique_id();
+
+    _state.reset();
+
+    RowLocation loc;
+    RowsetSharedPtr rowset;
+    std::string previous_encoded_seq_value {};
+    Status st = _writer._tablet->lookup_row_key(
+            key, &_tablet_schema, false, specified_rowsets, &loc, 
_writer._mow_context->max_version,
+            segment_caches, &rowset, true, &previous_encoded_seq_value);
+    int64_t pos = start;
+    DCHECK(st.is<ErrorCode::KEY_NOT_FOUND>() || st.ok());
+
+    std::string cur_seq_val;
+    if (st.ok()) {
+        for (int64_t i {start}; i < end; i++) {
+            auto& skip_bitmap = skip_bitmaps->at(i);
+            bool row_has_sequence_col = 
(!skip_bitmap.contains(seq_col_unique_id));
+            // Find the first row that has larger sequence value than the 
existing row's
+            // or the first row that doesn't specify the sequence column.
+            // Discard all the rows before and begin to do aggregation from 
that row

Review Comment:
   // Discard all the rows whose seq is little than previous_encoded_seq_value.
   
   if (row_has_sequence_col) {
       std::string seq_val {};
       _writer._encode_seq_column(seq_column, i, &seq_val);
       if (Slice {seq_val}.compare(Slice {previous_encoded_seq_value}) < 0) {
           continue;
       }
   
       cur_seq_val = move(seq_val);
       pos = i;
       break;
   }
   
   cur_seq_val = previous;
   pos = i;
   break;
   
   



##########
be/src/olap/partial_update_info.cpp:
##########
@@ -658,4 +664,422 @@ Status 
FlexibleReadPlan::fill_non_primary_key_columns_for_row_store(
     return Status::OK();
 }
 
+BlockAggregator::BlockAggregator(segment_v2::VerticalSegmentWriter& 
vertical_segment_writer)
+        : _writer(vertical_segment_writer), 
_tablet_schema(*_writer._tablet_schema) {}
+
+void BlockAggregator::merge_one_row(vectorized::MutableBlock& dst_block,
+                                    vectorized::Block* src_block, int64_t rid,
+                                    BitmapValue& skip_bitmap) {
+    for (size_t cid {_tablet_schema.num_key_columns()}; cid < 
_tablet_schema.num_columns(); cid++) {
+        if (cid == _tablet_schema.skip_bitmap_col_idx()) {
+            auto& cur_skip_bitmap =
+                    
assert_cast<vectorized::ColumnBitmap*>(dst_block.mutable_columns()[cid].get())
+                            ->get_data()
+                            .back();
+            const auto& new_row_skip_bitmap =
+                    assert_cast<vectorized::ColumnBitmap*>(
+                            
src_block->get_by_position(cid).column->assume_mutable().get())
+                            ->get_data()[rid];
+            cur_skip_bitmap &= new_row_skip_bitmap;
+            continue;
+        }
+        if (!skip_bitmap.contains(_tablet_schema.column(cid).unique_id())) {
+            dst_block.mutable_columns()[cid]->pop_back(1);
+            
dst_block.mutable_columns()[cid]->insert_from(*src_block->get_by_position(cid).column,
+                                                          rid);
+        }
+    }
+    VLOG_DEBUG << fmt::format("merge a row, after merge, 
output_block.rows()={}, state: {}",
+                              dst_block.rows(), _state.to_string());
+}
+
+void BlockAggregator::append_one_row(vectorized::MutableBlock& dst_block,
+                                     vectorized::Block* src_block, int64_t 
rid) {
+    dst_block.add_row(src_block, rid);
+    _state.rows++;
+    VLOG_DEBUG << fmt::format("append a new row, after append, 
output_block.rows()={}, state: {}",
+                              dst_block.rows(), _state.to_string());
+}
+
+void BlockAggregator::remove_last_n_rows(vectorized::MutableBlock& dst_block, 
int n) {
+    if (n > 0) {
+        for (size_t cid {0}; cid < _tablet_schema.num_columns(); cid++) {
+            DCHECK_GE(dst_block.mutable_columns()[cid]->size(), n);
+            dst_block.mutable_columns()[cid]->pop_back(n);
+        }
+    }
+}
+
+void BlockAggregator::append_or_merge_row(vectorized::MutableBlock& dst_block,
+                                          vectorized::Block* src_block, 
int64_t rid,
+                                          BitmapValue& skip_bitmap, bool 
have_delete_sign) {
+    if (have_delete_sign) {
+        // remove all the previous batched rows
+        remove_last_n_rows(dst_block, _state.rows);
+        _state.rows = 0;
+        _state.has_row_with_delete_sign = true;
+
+        append_one_row(dst_block, src_block, rid);
+    } else {
+        if (_state.should_merge()) {
+            merge_one_row(dst_block, src_block, rid, skip_bitmap);
+        } else {
+            append_one_row(dst_block, src_block, rid);
+        }
+    }
+};
+
+Status BlockAggregator::aggregate_rows(
+        vectorized::MutableBlock& output_block, vectorized::Block* block, 
int64_t start,
+        int64_t end, std::string key, std::vector<BitmapValue>* skip_bitmaps,
+        const signed char* delete_signs, vectorized::IOlapColumnDataAccessor* 
seq_column,
+        const std::vector<RowsetSharedPtr>& specified_rowsets,
+        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
+    VLOG_DEBUG << fmt::format("merge rows in range=[{}-{})", start, end);
+    if (end - start == 1) {
+        output_block.add_row(block, start);
+        VLOG_DEBUG << fmt::format("append a row directly, rid={}", start);
+        return Status::OK();
+    }
+
+    auto seq_col_unique_id = 
_tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id();
+    auto delete_sign_col_unique_id =
+            
_tablet_schema.column(_tablet_schema.delete_sign_idx()).unique_id();
+
+    _state.reset();
+
+    RowLocation loc;
+    RowsetSharedPtr rowset;
+    std::string previous_encoded_seq_value {};
+    Status st = _writer._tablet->lookup_row_key(
+            key, &_tablet_schema, false, specified_rowsets, &loc, 
_writer._mow_context->max_version,
+            segment_caches, &rowset, true, &previous_encoded_seq_value);
+    int64_t pos = start;
+    DCHECK(st.is<ErrorCode::KEY_NOT_FOUND>() || st.ok());
+
+    std::string cur_seq_val;
+    if (st.ok()) {
+        for (int64_t i {start}; i < end; i++) {
+            auto& skip_bitmap = skip_bitmaps->at(i);
+            bool row_has_sequence_col = 
(!skip_bitmap.contains(seq_col_unique_id));
+            // Find the first row that has larger sequence value than the 
existing row's
+            // or the first row that doesn't specify the sequence column.
+            // Discard all the rows before and begin to do aggregation from 
that row
+            if (!row_has_sequence_col) {
+                cur_seq_val = std::move(previous_encoded_seq_value);
+                pos = i;
+                break;
+            } else {
+                std::string seq_val {};
+                _writer._encode_seq_column(seq_column, i, &seq_val);
+                if (Slice {seq_val}.compare(Slice 
{previous_encoded_seq_value}) >= 0) {
+                    cur_seq_val = std::move(seq_val);
+                    pos = i;
+                    break;
+                }
+            }
+        }
+    } else {
+        pos = start;
+        auto& skip_bitmap = skip_bitmaps->at(pos);
+        bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
+        if (row_has_sequence_col) {
+            std::string seq_val {};
+            // for rows that don't specify seqeunce col, seq_val will be 
encoded to minial value
+            _writer._encode_seq_column(seq_column, pos, &seq_val);
+            cur_seq_val = std::move(seq_val);
+        } else {
+            cur_seq_val.clear();
+            RETURN_IF_ERROR(_writer._generate_encoded_default_seq_value(
+                    _tablet_schema, 
*_writer._opts.rowset_ctx->partial_update_info, &cur_seq_val));
+        }
+    }
+    VLOG_DEBUG << fmt::format("start pos={}, output_block.rows()={}", pos, 
output_block.rows());
+
+    for (int64_t rid {pos}; rid < end; rid++) {
+        auto& skip_bitmap = skip_bitmaps->at(rid);
+        bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
+        bool have_delete_sign =
+                (!skip_bitmap.contains(delete_sign_col_unique_id) && 
delete_signs[rid] != 0);
+        if (!row_has_sequence_col) {
+            append_or_merge_row(output_block, block, rid, skip_bitmap, 
have_delete_sign);
+        } else {
+            std::string seq_val {};
+            _writer._encode_seq_column(seq_column, rid, &seq_val);
+            if (Slice {seq_val}.compare(Slice {cur_seq_val}) >= 0) {
+                append_or_merge_row(output_block, block, rid, skip_bitmap, 
have_delete_sign);
+                cur_seq_val = std::move(seq_val);
+            } else {
+                VLOG_DEBUG << fmt::format(
+                        "skip rid={} becasue its seq value is lower than the 
previous", rid);
+            }
+        }
+    }
+    return Status::OK();
+};
+
+Status BlockAggregator::aggregate_for_sequence_column(
+        vectorized::Block* block, size_t num_rows,
+        const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
+        vectorized::IOlapColumnDataAccessor* seq_column,
+        const std::vector<RowsetSharedPtr>& specified_rowsets,
+        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
+    DCHECK_EQ(block->columns(), _tablet_schema.num_columns());
+    VLOG_DEBUG << fmt::format("BlockAggregator::aggregate_for_sequence_column 
enter: block:{}\n",
+                              block->dump_data());
+    // the process logic here is the same as 
MemTable::_aggregate_for_flexible_partial_update_without_seq_col()
+    // after this function, there will be at most 2 rows for a specified key
+    std::vector<BitmapValue>* skip_bitmaps =
+            &(assert_cast<vectorized::ColumnBitmap*>(
+                      
block->get_by_position(_tablet_schema.skip_bitmap_col_idx())
+                              .column->assume_mutable()
+                              .get())
+                      ->get_data());
+    const auto* delete_signs = BaseTablet::get_delete_sign_column_data(*block, 
num_rows);
+
+    auto filtered_block = _tablet_schema.create_block();
+    vectorized::MutableBlock output_block =
+            vectorized::MutableBlock::build_mutable_block(&filtered_block);
+
+    int same_key_rows {0};
+    std::string previous_key {};
+    for (size_t block_pos {0}; block_pos < num_rows; block_pos++) {
+        std::string key = _writer._full_encode_keys(key_columns, block_pos);
+        if (block_pos > 0 && previous_key == key) {
+            same_key_rows++;
+        } else {
+            if (same_key_rows > 0) {
+                RETURN_IF_ERROR(aggregate_rows(output_block, block, block_pos 
- same_key_rows,
+                                               block_pos, 
std::move(previous_key), skip_bitmaps,
+                                               delete_signs, seq_column, 
specified_rowsets,
+                                               segment_caches));
+            }
+            same_key_rows = 1;
+        }
+        previous_key = std::move(key);
+    }
+    if (same_key_rows > 0) {
+        RETURN_IF_ERROR(aggregate_rows(output_block, block, num_rows - 
same_key_rows, num_rows,
+                                       std::move(previous_key), skip_bitmaps, 
delete_signs,
+                                       seq_column, specified_rowsets, 
segment_caches));
+    }
+
+    block->swap(output_block.to_block());
+    VLOG_DEBUG << fmt::format(
+            "BlockAggregator::aggregate_for_sequence_column after: 
data.block:{}\n",
+            block->dump_data());
+    return Status::OK();
+}
+
+Status BlockAggregator::fill_sequence_column(vectorized::Block* block, size_t 
num_rows,
+                                             const FixedReadPlan& read_plan,
+                                             std::vector<BitmapValue>& 
skip_bitmaps) {
+    DCHECK(_tablet_schema.has_sequence_col());
+    std::vector<uint32_t> cids 
{static_cast<uint32_t>(_tablet_schema.sequence_col_idx())};
+    auto seq_col_unique_id = 
_tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id();
+
+    auto seq_col_block = _tablet_schema.create_block_by_cids(cids);
+    auto tmp_block = _tablet_schema.create_block_by_cids(cids);
+    std::map<uint32_t, uint32_t> read_index;
+    RETURN_IF_ERROR(read_plan.read_columns_by_plan(_tablet_schema, cids, 
_writer._rsid_to_rowset,
+                                                   seq_col_block, 
&read_index));
+
+    auto new_seq_col_ptr = 
tmp_block.get_by_position(0).column->assume_mutable();
+    const auto& old_seq_col_ptr = *seq_col_block.get_by_position(0).column;
+    const auto& cur_seq_col_ptr = 
*block->get_by_position(_tablet_schema.sequence_col_idx()).column;
+    for (size_t block_pos {0}; block_pos < num_rows; block_pos++) {
+        if (read_index.contains(block_pos)) {
+            new_seq_col_ptr->insert_from(old_seq_col_ptr, 
read_index[block_pos]);
+            skip_bitmaps[block_pos].remove(seq_col_unique_id);
+        } else {
+            new_seq_col_ptr->insert_from(cur_seq_col_ptr, block_pos);
+        }
+    }
+    block->replace_by_position(_tablet_schema.sequence_col_idx(), 
std::move(new_seq_col_ptr));
+    return Status::OK();
+}
+
+Status BlockAggregator::aggregate_for_insert_after_delete(
+        vectorized::Block* block, size_t num_rows,
+        const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
+        const std::vector<RowsetSharedPtr>& specified_rowsets,
+        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
+    DCHECK_EQ(block->columns(), _tablet_schema.num_columns());
+    VLOG_DEBUG << fmt::format(
+            "BlockAggregator::aggregate_for_insert_after_delete enter: 
data.block:{}\n",
+            block->dump_data());
+    // there will be at most 2 rows for a specified key in block when control 
flow reaches here
+    // after this function, there will not be duplicate rows in block
+
+    std::vector<BitmapValue>* skip_bitmaps =
+            &(assert_cast<vectorized::ColumnBitmap*>(
+                      
block->get_by_position(_tablet_schema.skip_bitmap_col_idx())
+                              .column->assume_mutable()
+                              .get())
+                      ->get_data());
+    const auto* delete_signs = BaseTablet::get_delete_sign_column_data(*block, 
num_rows);
+
+    auto filter_column = vectorized::ColumnUInt8::create(num_rows, 1);
+    auto* __restrict filter_map = filter_column->get_data().data();
+    std::string previous_key {};
+    bool previous_has_delete_sign {false};
+    int duplicate_rows {0};
+    int32_t delete_sign_col_unique_id =
+            
_tablet_schema.column(_tablet_schema.delete_sign_idx()).unique_id();
+    auto seq_col_unique_id =
+            (_tablet_schema.sequence_col_idx() != -1)
+                    ? 
_tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id()
+                    : -1;
+    FixedReadPlan read_plan;
+    for (size_t block_pos {0}; block_pos < num_rows; block_pos++) {
+        size_t delta_pos = block_pos;
+        auto& skip_bitmap = skip_bitmaps->at(block_pos);
+        std::string key = _writer._full_encode_keys(key_columns, delta_pos);
+        bool have_delete_sign =
+                (!skip_bitmap.contains(delete_sign_col_unique_id) && 
delete_signs[block_pos] != 0);
+        if (delta_pos > 0 && previous_key == key) {
+            // !!ATTENTION!!: We can only remove the row with delete sign if 
there is a insert with the same key after this row.
+            // If there is only a row with delete sign, we should keep it and 
can't remove it from block, because
+            // compaction will not use the delete bitmap when reading data. So 
there may still be rows with delete sign
+            // in later process
+            DCHECK(previous_has_delete_sign);
+            DCHECK(!have_delete_sign);
+            ++duplicate_rows;
+            RowLocation loc;
+            RowsetSharedPtr rowset;
+            Status st = _writer._tablet->lookup_row_key(
+                    key, &_tablet_schema, false, specified_rowsets, &loc,
+                    _writer._mow_context->max_version, segment_caches, 
&rowset, true);
+            DCHECK(st.is<ErrorCode::KEY_NOT_FOUND>() || st.ok());

Review Comment:
   check st.



##########
be/src/olap/partial_update_info.cpp:
##########
@@ -658,4 +664,422 @@ Status 
FlexibleReadPlan::fill_non_primary_key_columns_for_row_store(
     return Status::OK();
 }
 
+BlockAggregator::BlockAggregator(segment_v2::VerticalSegmentWriter& 
vertical_segment_writer)
+        : _writer(vertical_segment_writer), 
_tablet_schema(*_writer._tablet_schema) {}
+
+void BlockAggregator::merge_one_row(vectorized::MutableBlock& dst_block,
+                                    vectorized::Block* src_block, int64_t rid,
+                                    BitmapValue& skip_bitmap) {
+    for (size_t cid {_tablet_schema.num_key_columns()}; cid < 
_tablet_schema.num_columns(); cid++) {
+        if (cid == _tablet_schema.skip_bitmap_col_idx()) {
+            auto& cur_skip_bitmap =
+                    
assert_cast<vectorized::ColumnBitmap*>(dst_block.mutable_columns()[cid].get())
+                            ->get_data()
+                            .back();
+            const auto& new_row_skip_bitmap =
+                    assert_cast<vectorized::ColumnBitmap*>(
+                            
src_block->get_by_position(cid).column->assume_mutable().get())
+                            ->get_data()[rid];
+            cur_skip_bitmap &= new_row_skip_bitmap;
+            continue;
+        }
+        if (!skip_bitmap.contains(_tablet_schema.column(cid).unique_id())) {
+            dst_block.mutable_columns()[cid]->pop_back(1);
+            
dst_block.mutable_columns()[cid]->insert_from(*src_block->get_by_position(cid).column,
+                                                          rid);
+        }
+    }
+    VLOG_DEBUG << fmt::format("merge a row, after merge, 
output_block.rows()={}, state: {}",
+                              dst_block.rows(), _state.to_string());
+}
+
+void BlockAggregator::append_one_row(vectorized::MutableBlock& dst_block,
+                                     vectorized::Block* src_block, int64_t 
rid) {
+    dst_block.add_row(src_block, rid);
+    _state.rows++;
+    VLOG_DEBUG << fmt::format("append a new row, after append, 
output_block.rows()={}, state: {}",
+                              dst_block.rows(), _state.to_string());
+}
+
+void BlockAggregator::remove_last_n_rows(vectorized::MutableBlock& dst_block, 
int n) {
+    if (n > 0) {
+        for (size_t cid {0}; cid < _tablet_schema.num_columns(); cid++) {
+            DCHECK_GE(dst_block.mutable_columns()[cid]->size(), n);
+            dst_block.mutable_columns()[cid]->pop_back(n);
+        }
+    }
+}
+
+void BlockAggregator::append_or_merge_row(vectorized::MutableBlock& dst_block,
+                                          vectorized::Block* src_block, 
int64_t rid,
+                                          BitmapValue& skip_bitmap, bool 
have_delete_sign) {
+    if (have_delete_sign) {
+        // remove all the previous batched rows
+        remove_last_n_rows(dst_block, _state.rows);
+        _state.rows = 0;
+        _state.has_row_with_delete_sign = true;
+
+        append_one_row(dst_block, src_block, rid);
+    } else {
+        if (_state.should_merge()) {
+            merge_one_row(dst_block, src_block, rid, skip_bitmap);
+        } else {
+            append_one_row(dst_block, src_block, rid);
+        }
+    }
+};
+
+Status BlockAggregator::aggregate_rows(
+        vectorized::MutableBlock& output_block, vectorized::Block* block, 
int64_t start,
+        int64_t end, std::string key, std::vector<BitmapValue>* skip_bitmaps,
+        const signed char* delete_signs, vectorized::IOlapColumnDataAccessor* 
seq_column,
+        const std::vector<RowsetSharedPtr>& specified_rowsets,
+        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
+    VLOG_DEBUG << fmt::format("merge rows in range=[{}-{})", start, end);
+    if (end - start == 1) {
+        output_block.add_row(block, start);
+        VLOG_DEBUG << fmt::format("append a row directly, rid={}", start);
+        return Status::OK();
+    }
+
+    auto seq_col_unique_id = 
_tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id();
+    auto delete_sign_col_unique_id =
+            
_tablet_schema.column(_tablet_schema.delete_sign_idx()).unique_id();
+
+    _state.reset();
+
+    RowLocation loc;
+    RowsetSharedPtr rowset;
+    std::string previous_encoded_seq_value {};
+    Status st = _writer._tablet->lookup_row_key(
+            key, &_tablet_schema, false, specified_rowsets, &loc, 
_writer._mow_context->max_version,
+            segment_caches, &rowset, true, &previous_encoded_seq_value);
+    int64_t pos = start;
+    DCHECK(st.is<ErrorCode::KEY_NOT_FOUND>() || st.ok());

Review Comment:
   There are other errors can be reported by lookup_row_key, we should handle 
it. DCHECK is not enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to