dataroaring commented on code in PR #41701:
URL: https://github.com/apache/doris/pull/41701#discussion_r1802413848


##########
be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp:
##########
@@ -883,45 +951,160 @@ Status 
VerticalSegmentWriter::_merge_rows_for_sequence_column(
                 _rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
                 previous_seq_slice = Slice {previous_encoded_seq_value};
             }
-            std::string cur_encoded_seq_value {};
-            _encode_seq_column(seq_column, rid_with_seq, 
&cur_encoded_seq_value);
+            if (batched_rows[2] != -1) {
+                _encode_seq_column(seq_column, batched_rows[2],
+                                   &row_with_delete_sign_encoded_seq_value);
+            }
+            if (batched_rows[3] != -1) {
+                _encode_seq_column(seq_column, batched_rows[3],
+                                   &row_without_delete_sign_encoded_seq_value);
+            }
+
+            auto remove_rows_without_seq = [&]() {
+                if (batched_rows[0] != -1) {
+                    filter_map[batched_rows[0]] = 0;
+                    ++duplicate_keys;
+                }
+                if (batched_rows[1] != -1) {
+                    filter_map[batched_rows[1]] = 0;
+                    ++duplicate_keys;
+                }
+            };
+
             // the encoded value is order-preserving, so we can use 
Slice::compare() to compare them
-            int res = previous_seq_slice.compare(Slice 
{cur_encoded_seq_value});
-            VLOG_DEBUG << fmt::format(
-                    "VerticalSegmentWriter::_merge_rows_for_sequence_column: 
rid_with_seq={}, "
-                    "rid_missing_seq={}, res={}",
-                    rid_with_seq, rid_missing_seq, res);
-            if (res > 0) {
-                filter_map[rid_with_seq] = 0;
-            } else if (res < 0) {
-                filter_map[rid_missing_seq] = 0;
+            if (batched_rows[2] != -1 && batched_rows[3] != -1) {
+                // it's guaranteed that the sequence value of 
batched_rows[2]'s is strictly smaller than batched_rows[3]'s
+                if (previous_seq_slice.compare(Slice 
{row_with_delete_sign_encoded_seq_value}) <=
+                    0) {
+                    remove_rows_without_seq();
+                } else if (previous_seq_slice.compare(
+                                   Slice 
{row_without_delete_sign_encoded_seq_value}) <= 0) {
+                    remove_rows_without_seq();
+                    filter_map[batched_rows[2]] = 0;
+                    ++duplicate_keys;
+                } else {
+                    filter_map[batched_rows[2]] = 0;
+                    ++duplicate_keys;
+                    filter_map[batched_rows[3]] = 0;
+                    ++duplicate_keys;
+                }
+            } else if (batched_rows[2] != -1) {
+                if (previous_seq_slice.compare(Slice 
{row_with_delete_sign_encoded_seq_value}) <=
+                    0) {
+                    remove_rows_without_seq();
+                } else {
+                    filter_map[batched_rows[2]] = 0;
+                    ++duplicate_keys;
+                }
             } else {
-                filter_map[std::min(rid_with_seq, rid_missing_seq)] = 0;
+                if (previous_seq_slice.compare(Slice 
{row_without_delete_sign_encoded_seq_value}) <=
+                    0) {
+                    remove_rows_without_seq();
+                } else {
+                    filter_map[batched_rows[3]] = 0;
+                    ++duplicate_keys;
+                }
+            }
+        }
+        has_same_rows = false;
+        batched_rows.fill(-1);
+    };
+
+    for (size_t block_pos = data.row_pos; block_pos < data.row_pos + 
data.num_rows; block_pos++) {
+        size_t delta_pos = block_pos - data.row_pos;
+        auto& skip_bitmap = skip_bitmaps->at(block_pos);
+        std::string key = _full_encode_keys(key_columns, delta_pos);
+        bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
+        bool row_has_delete_sign =
+                (!skip_bitmap.contains(delete_sign_col_unique_id) && 
delete_signs[block_pos] != 0);
+        VLOG_DEBUG << fmt::format("block_pos={}, row_has_sequence_col={}, 
row_has_delete_sign={}",
+                                  block_pos, row_has_sequence_col, 
row_has_delete_sign);
+        if (delta_pos > 0 && previous_key == key) {
+            if (!has_same_rows) {
+                batched_rows[get_idx(previous_has_seq_col, 
previous_has_delete_sign)] =
+                        block_pos - 1;
+                has_same_rows = true;
+            }
+            batched_rows[get_idx(row_has_sequence_col, row_has_delete_sign)] = 
block_pos;
+        } else {
+            if (has_same_rows) {
+                find_rows_to_filter(previous_key);
             }
         }
         previous_key = std::move(key);
         previous_has_seq_col = row_has_sequence_col;
+        previous_has_delete_sign = row_has_delete_sign;
+    }
+    if (has_same_rows) {
+        find_rows_to_filter(previous_key);
     }
     if (duplicate_keys > 0) {
-        auto num_cols = data.block->columns();
-        auto* block = const_cast<vectorized::Block*>(data.block);
-        block->insert({std::move(filter_column), 
std::make_shared<vectorized::DataTypeUInt8>(),
-                       "__dup_key_filter_col__"});
-        RETURN_IF_ERROR(vectorized::Block::filter_block(block, num_cols, 
num_cols));
-        int merged_rows = data.num_rows - block->rows();
-        VLOG_DEBUG << fmt::format(
-                "VerticalSegmentWriter::_merge_rows_for_sequence_column after 
filter: "
-                "data.block:{}\n",
-                data.block->dump_data());
-        if (duplicate_keys != merged_rows) {
-            auto msg = fmt::format(
-                    "duplicate_keys != merged_rows, duplicate_keys={}, 
merged_rows={}, "
-                    "num_rows={}, mutable_block->rows()={}",
-                    duplicate_keys, merged_rows, data.num_rows, block->rows());
-            DCHECK(false) << msg;
-            return Status::InternalError<false>(msg);
+        RETURN_IF_ERROR(_filter_block_for_flexible_partial_update(
+                data, std::move(filter_column), duplicate_keys, 
"__dup_key_filter_col__"));
+    }
+    return Status::OK();
+}
+
+Status VerticalSegmentWriter::_merge_rows_for_insert_after_delete(
+        RowsInBlock& data, std::vector<BitmapValue>* skip_bitmaps,
+        const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
+        vectorized::IOlapColumnDataAccessor* seq_column, const signed char* 
delete_signs,
+        const std::vector<RowsetSharedPtr>& specified_rowsets,
+        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
+    VLOG_DEBUG << fmt::format(
+            "VerticalSegmentWriter::_merge_rows_for_insert_after_delete enter: 
data.block:{}\n",
+            data.block->dump_data());
+    // there will be at most 2 rows for a specified key in block when control 
flow reaches here
+    // after this function, there will not be duplicate rows in block
+    auto filter_column = vectorized::ColumnUInt8::create(data.num_rows, 1);
+    auto* __restrict filter_map = filter_column->get_data().data();
+    std::string previous_key {};
+    bool previous_has_delete_sign {false};
+    int duplicate_rows {0};
+    int32_t delete_sign_col_unique_id =
+            
_tablet_schema->column(_tablet_schema->delete_sign_idx()).unique_id();
+    for (size_t block_pos = data.row_pos; block_pos < data.row_pos + 
data.num_rows; block_pos++) {
+        size_t delta_pos = block_pos - data.row_pos;
+        auto& skip_bitmap = skip_bitmaps->at(block_pos);
+        std::string key = _full_encode_keys(key_columns, delta_pos);
+        bool have_delete_sign =
+                (!skip_bitmap.contains(delete_sign_col_unique_id) && 
delete_signs[block_pos] != 0);
+        if (delta_pos > 0 && previous_key == key) {
+            // !!ATTENTION!!: We can only remove the row with delete sign if 
there is a insert with the same key after this row.
+            // If there is only a row with delete sign, we should keep it and 
can't remove it from block, because
+            // compaction will not use the delete bitmap when reading data. So 
there may still be rows with delete sign
+            // in later process
+            DCHECK(previous_has_delete_sign);
+            DCHECK(!have_delete_sign);
+            ++duplicate_rows;
+            RowLocation loc;
+            RowsetSharedPtr rowset;
+            Status st = _tablet->lookup_row_key(key, _tablet_schema.get(), 
false, specified_rowsets,
+                                                &loc, 
_mow_context->max_version, segment_caches,
+                                                &rowset, true);
+            DCHECK(st.is<KEY_NOT_FOUND>() || st.ok());
+
+            Slice previous_seq_slice {};
+            if (st.ok()) {
+                // delete the existing row
+                _mow_context->delete_bitmap->add(
+                        {loc.rowset_id, loc.segment_id, 
DeleteBitmap::TEMP_VERSION_COMMON},
+                        loc.row_id);
+                VLOG_DEBUG << fmt::format(
+                        "_merge_rows_for_insert_after_delete: delete existing 
row, rowset_id={}, "
+                        "segment_id={}, row_id={}",
+                        loc.rowset_id.to_string(), loc.segment_id, loc.row_id);

Review Comment:
   Maybe we should implement a new VerticalSegmentWriter inherts from 
VerticalSegmentWriter which allows duplicated rows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to