This is an automated email from the ASF dual-hosted git repository.

zhangchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c03b564d6a1 [fix](merge-on-write) segcompaction should process delete 
bitmap if necessary (#38369)
c03b564d6a1 is described below

commit c03b564d6a1ae477d58d3370fb5e9f9c60e92cc2
Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com>
AuthorDate: Thu Aug 1 15:00:28 2024 +0800

    [fix](merge-on-write) segcompaction should process delete bitmap if 
necessary (#38369)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    When loading data to a unique key table with sequence column, some data
    in current load job might be marked as delete due to a lower sequence
    value.
    If there's many segments in such load job, segcompaction might be
    triggered, which don't process the delete bitmap currently, will cause
    data correctness issue
    
    For example:
    1. we have 4 segments in current load job initially, and due to seq
    column, some rows are marked as deleted
    2. after segcompaction, if we don't process the delete bitmap, it's
    content is still corresponding to the old segment layout, and row
    7,14,15 is not mark deleted correctly on new generated segment 1.
    3. in this PR, we convert old delete bitmap to fit new segment layout,
    it use similar way as base/cumulative compaction to convert delete
    bitmaps on old layout to new one, but the rowid conversion is simpler
    
    
    
![whiteboard_exported_image-2](https://github.com/user-attachments/assets/a419b6a4-e583-457a-bf4e-56d9bd2a3544)
---
 be/src/olap/merger.cpp                    |  19 +-
 be/src/olap/merger.h                      |   6 +-
 be/src/olap/rowset/beta_rowset_writer.cpp |  34 +-
 be/src/olap/rowset/beta_rowset_writer.h   |   3 +-
 be/src/olap/rowset/segcompaction.cpp      |  67 ++-
 be/src/olap/rowset/segcompaction.h        |  13 +
 be/src/olap/simple_rowid_conversion.h     |  84 +++
 be/src/vec/olap/vertical_block_reader.cpp |   1 +
 be/test/common/status_test.cpp            |   2 -
 be/test/olap/segcompaction_mow_test.cpp   | 890 ++++++++++++++++++++++++++++++
 10 files changed, 1100 insertions(+), 19 deletions(-)

diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index ad70241ad87..87792db93a6 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -336,16 +336,12 @@ Status Merger::vertical_compact_one_group(
 }
 
 // for segcompaction
-Status Merger::vertical_compact_one_group(int64_t tablet_id, ReaderType 
reader_type,
-                                          const TabletSchema& tablet_schema, 
bool is_key,
-                                          const std::vector<uint32_t>& 
column_group,
-                                          vectorized::RowSourcesBuffer* 
row_source_buf,
-                                          vectorized::VerticalBlockReader& 
src_block_reader,
-                                          segment_v2::SegmentWriter& 
dst_segment_writer,
-                                          int64_t max_rows_per_segment, 
Statistics* stats_output,
-                                          uint64_t* index_size, KeyBoundsPB& 
key_bounds) {
-    // build tablet reader
-    VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << 
max_rows_per_segment;
+Status Merger::vertical_compact_one_group(
+        int64_t tablet_id, ReaderType reader_type, const TabletSchema& 
tablet_schema, bool is_key,
+        const std::vector<uint32_t>& column_group, 
vectorized::RowSourcesBuffer* row_source_buf,
+        vectorized::VerticalBlockReader& src_block_reader,
+        segment_v2::SegmentWriter& dst_segment_writer, Statistics* 
stats_output,
+        uint64_t* index_size, KeyBoundsPB& key_bounds, SimpleRowIdConversion* 
rowid_conversion) {
     // TODO: record_rowids
     vectorized::Block block = tablet_schema.create_block(column_group);
     size_t output_rows = 0;
@@ -362,6 +358,9 @@ Status Merger::vertical_compact_one_group(int64_t 
tablet_id, ReaderType reader_t
                                        "failed to write block when merging 
rowsets of tablet " +
                                                std::to_string(tablet_id));
 
+        if (is_key && rowid_conversion != nullptr) {
+            
rowid_conversion->add(src_block_reader.current_block_row_locations());
+        }
         output_rows += block.rows();
         block.clear_column_data();
     }
diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h
index 7513c90fbd1..cb05162b3bc 100644
--- a/be/src/olap/merger.h
+++ b/be/src/olap/merger.h
@@ -23,6 +23,7 @@
 #include "io/io_common.h"
 #include "olap/iterators.h"
 #include "olap/rowset/rowset_fwd.h"
+#include "olap/simple_rowid_conversion.h"
 #include "olap/tablet_fwd.h"
 
 namespace doris {
@@ -82,8 +83,9 @@ public:
                                              vectorized::RowSourcesBuffer* 
row_source_buf,
                                              vectorized::VerticalBlockReader& 
src_block_reader,
                                              segment_v2::SegmentWriter& 
dst_segment_writer,
-                                             int64_t max_rows_per_segment, 
Statistics* stats_output,
-                                             uint64_t* index_size, 
KeyBoundsPB& key_bounds);
+                                             Statistics* stats_output, 
uint64_t* index_size,
+                                             KeyBoundsPB& key_bounds,
+                                             SimpleRowIdConversion* 
rowid_conversion);
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 318d7d60502..4481f3b18c0 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -341,7 +341,12 @@ Status 
BetaRowsetWriter::_find_longest_consecutive_small_segment(
         if (is_large_segment) {
             if (segid == _segcompacted_point) {
                 // skip large segments at the front
+                auto dst_seg_id = _num_segcompacted.load();
                 
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
+                if (_segcompaction_worker->need_convert_delete_bitmap()) {
+                    _segcompaction_worker->convert_segment_delete_bitmap(
+                            _context.mow_context->delete_bitmap, segid, 
dst_seg_id);
+                }
                 continue;
             } else {
                 // stop because we need consecutive segments
@@ -366,7 +371,13 @@ Status 
BetaRowsetWriter::_find_longest_consecutive_small_segment(
     }
     if (s == 1) { // poor bachelor, let it go
         VLOG_DEBUG << "only one candidate segment";
+        auto src_seg_id = _segcompacted_point.load();
+        auto dst_seg_id = _num_segcompacted.load();
         
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
+        if (_segcompaction_worker->need_convert_delete_bitmap()) {
+            _segcompaction_worker->convert_segment_delete_bitmap(
+                    _context.mow_context->delete_bitmap, src_seg_id, 
dst_seg_id);
+        }
         segments->clear();
         return Status::OK();
     }
@@ -554,7 +565,7 @@ Status 
BetaRowsetWriter::_segcompaction_rename_last_segments() {
                 "code: {}",
                 _segcompaction_status.load());
     }
-    if (!_is_segcompacted() || _segcompacted_point == _num_segment) {
+    if (!is_segcompacted() || _segcompacted_point == _num_segment) {
         // no need if never segcompact before or all segcompacted
         return Status::OK();
     }
@@ -562,7 +573,12 @@ Status 
BetaRowsetWriter::_segcompaction_rename_last_segments() {
     // so that transaction can be committed ASAP
     VLOG_DEBUG << "segcompaction last few segments";
     for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) {
+        auto dst_segid = _num_segcompacted.load();
         
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
+        if (_segcompaction_worker->need_convert_delete_bitmap()) {
+            _segcompaction_worker->convert_segment_delete_bitmap(
+                    _context.mow_context->delete_bitmap, segid, dst_segid);
+        }
     }
     return Status::OK();
 }
@@ -682,6 +698,20 @@ Status BetaRowsetWriter::_close_file_writers() {
             RETURN_NOT_OK_STATUS_WITH_WARN(seg_comp_file_writer->close(),
                                            "close segment compaction worker 
failed");
         }
+        // process delete bitmap for mow table
+        if (is_segcompacted() && 
_segcompaction_worker->need_convert_delete_bitmap()) {
+            auto converted_delete_bitmap = 
_segcompaction_worker->get_converted_delete_bitmap();
+            // which means the segment compaction is triggerd
+            if (converted_delete_bitmap != nullptr) {
+                RowsetIdUnorderedSet rowsetids;
+                rowsetids.insert(rowset_id());
+                
context().tablet->add_sentinel_mark_to_delete_bitmap(converted_delete_bitmap.get(),
+                                                                     
rowsetids);
+                context().mow_context->delete_bitmap->remove({rowset_id(), 0, 
0},
+                                                             {rowset_id(), 
UINT32_MAX, INT64_MAX});
+                
context().mow_context->delete_bitmap->merge(*converted_delete_bitmap);
+            }
+        }
     }
     return Status::OK();
 }
@@ -719,7 +749,7 @@ int64_t BaseBetaRowsetWriter::_num_seg() const {
 }
 
 int64_t BetaRowsetWriter::_num_seg() const {
-    return _is_segcompacted() ? _num_segcompacted : _num_segment;
+    return is_segcompacted() ? _num_segcompacted : _num_segment;
 }
 
 // update tablet schema when meet variant columns, before commit_txn
diff --git a/be/src/olap/rowset/beta_rowset_writer.h 
b/be/src/olap/rowset/beta_rowset_writer.h
index 98bb43c6092..f8033accfca 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -227,6 +227,8 @@ public:
             std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t 
index_size,
             KeyBoundsPB& key_bounds);
 
+    bool is_segcompacted() const { return _num_segcompacted > 0; }
+
 private:
     // segment compaction
     friend class SegcompactionWorker;
@@ -240,7 +242,6 @@ private:
     Status _segcompaction_rename_last_segments();
     Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, 
int32_t segment_id);
     Status 
_find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr& 
segments);
-    bool _is_segcompacted() const { return _num_segcompacted > 0; }
     bool _check_and_set_is_doing_segcompaction();
     Status _rename_compacted_segments(int64_t begin, int64_t end);
     Status _rename_compacted_segment_plain(uint64_t seg_id);
diff --git a/be/src/olap/rowset/segcompaction.cpp 
b/be/src/olap/rowset/segcompaction.cpp
index f3e8d9f085c..0d3c55a6bf0 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -76,11 +76,14 @@ Status SegcompactionWorker::_get_segcompaction_reader(
         std::vector<uint32_t>& return_columns,
         std::unique_ptr<vectorized::VerticalBlockReader>* reader) {
     const auto& ctx = _writer->_context;
+    bool record_rowids = need_convert_delete_bitmap() && is_key;
     StorageReadOptions read_options;
     read_options.stats = stat;
     read_options.use_page_cache = false;
     read_options.tablet_schema = ctx.tablet_schema;
+    read_options.record_rowids = record_rowids;
     std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    std::map<uint32_t, uint32_t> segment_rows;
     for (auto& seg_ptr : *segments) {
         std::unique_ptr<RowwiseIterator> iter;
         auto s = seg_ptr->new_iterator(schema, read_options, &iter);
@@ -89,6 +92,10 @@ Status SegcompactionWorker::_get_segcompaction_reader(
                                               s.to_string());
         }
         seg_iterators.push_back(std::move(iter));
+        segment_rows.emplace(seg_ptr->id(), seg_ptr->num_rows());
+    }
+    if (record_rowids && _rowid_conversion != nullptr) {
+        _rowid_conversion->reset_segment_map(segment_rows);
     }
 
     *reader = 
std::make_unique<vectorized::VerticalBlockReader>(&row_sources_buf);
@@ -102,6 +109,7 @@ Status SegcompactionWorker::_get_segcompaction_reader(
     reader_params.return_columns = return_columns;
     reader_params.is_key_column_group = is_key;
     reader_params.use_page_cache = false;
+    reader_params.record_rowids = record_rowids;
     return (*reader)->init(reader_params, nullptr);
 }
 
@@ -235,6 +243,9 @@ Status 
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
 
     DCHECK(ctx.tablet);
     auto tablet = std::static_pointer_cast<Tablet>(ctx.tablet);
+    if (need_convert_delete_bitmap() && _rowid_conversion == nullptr) {
+        _rowid_conversion = 
std::make_unique<SimpleRowIdConversion>(_writer->rowset_id());
+    }
 
     std::vector<std::vector<uint32_t>> column_groups;
     Merger::vertical_split_columns(*ctx.tablet_schema, &column_groups);
@@ -265,8 +276,8 @@ Status 
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
         Merger::Statistics merger_stats;
         RETURN_IF_ERROR(Merger::vertical_compact_one_group(
                 tablet->tablet_id(), ReaderType::READER_SEGMENT_COMPACTION, 
*ctx.tablet_schema,
-                is_key, column_ids, &row_sources_buf, *reader, *writer, 
INT_MAX, &merger_stats,
-                &index_size, key_bounds));
+                is_key, column_ids, &row_sources_buf, *reader, *writer, 
&merger_stats, &index_size,
+                key_bounds, _rowid_conversion.get()));
         total_index_size += index_size;
         if (is_key) {
             RETURN_IF_ERROR(row_sources_buf.flush());
@@ -292,6 +303,10 @@ Status 
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
     }
 
     RETURN_IF_ERROR(_delete_original_segments(begin, end));
+    if (_rowid_conversion != nullptr) {
+        convert_segment_delete_bitmap(ctx.mow_context->delete_bitmap, begin, 
end,
+                                      _writer->_num_segcompacted);
+    }
     RETURN_IF_ERROR(_writer->_rename_compacted_segments(begin, end));
 
     if (VLOG_DEBUG_IS_ON) {
@@ -352,6 +367,54 @@ void 
SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segm
     _is_compacting_state_mutable = true;
 }
 
+bool SegcompactionWorker::need_convert_delete_bitmap() {
+    if (_writer == nullptr) {
+        return false;
+    }
+    auto tablet = _writer->context().tablet;
+    return tablet != nullptr && tablet->keys_type() == KeysType::UNIQUE_KEYS &&
+           tablet->enable_unique_key_merge_on_write() &&
+           tablet->tablet_schema()->has_sequence_col();
+}
+
+void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr 
src_delete_bitmap,
+                                                        uint32_t src_seg_id, 
uint32_t dest_seg_id) {
+    // lazy init
+    if (nullptr == _converted_delete_bitmap) {
+        _converted_delete_bitmap = 
std::make_shared<DeleteBitmap>(_writer->context().tablet_id);
+    }
+    auto rowset_id = _writer->context().rowset_id;
+    const auto* seg_map =
+            src_delete_bitmap->get({rowset_id, src_seg_id, 
DeleteBitmap::TEMP_VERSION_COMMON});
+    _converted_delete_bitmap->set({rowset_id, dest_seg_id, 
DeleteBitmap::TEMP_VERSION_COMMON},
+                                  *seg_map);
+}
+
+void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr 
src_delete_bitmap,
+                                                        uint32_t src_begin, 
uint32_t src_end,
+                                                        uint32_t dst_seg_id) {
+    // lazy init
+    if (nullptr == _converted_delete_bitmap) {
+        _converted_delete_bitmap = 
std::make_shared<DeleteBitmap>(_writer->context().tablet_id);
+    }
+    auto rowset_id = _writer->context().rowset_id;
+    RowLocation src(rowset_id, 0, 0);
+    for (uint32_t seg_id = src_begin; seg_id <= src_end; seg_id++) {
+        const auto* seg_map =
+                src_delete_bitmap->get({rowset_id, seg_id, 
DeleteBitmap::TEMP_VERSION_COMMON});
+        src.segment_id = seg_id;
+        for (unsigned int row_id : *seg_map) {
+            src.row_id = row_id;
+            auto dst_row_id = _rowid_conversion->get(src);
+            if (dst_row_id < 0) {
+                continue;
+            }
+            _converted_delete_bitmap->add(
+                    {rowset_id, dst_seg_id, 
DeleteBitmap::TEMP_VERSION_COMMON}, dst_row_id);
+        }
+    }
+}
+
 bool SegcompactionWorker::cancel() {
     // return true if the task is canncellable (actual compaction is not 
started)
     // return false when the task is not cancellable (it is in the middle of 
segcompaction)
diff --git a/be/src/olap/rowset/segcompaction.h 
b/be/src/olap/rowset/segcompaction.h
index 5aef89992d3..67dd6889aad 100644
--- a/be/src/olap/rowset/segcompaction.h
+++ b/be/src/olap/rowset/segcompaction.h
@@ -23,6 +23,7 @@
 #include "common/status.h"
 #include "io/fs/file_reader_writer_fwd.h"
 #include "olap/merger.h"
+#include "olap/simple_rowid_conversion.h"
 #include "olap/tablet.h"
 #include "segment_v2/segment.h"
 
@@ -51,6 +52,14 @@ public:
 
     void compact_segments(SegCompactionCandidatesSharedPtr segments);
 
+    bool need_convert_delete_bitmap();
+
+    void convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, 
uint32_t src_seg_id,
+                                       uint32_t dest_seg_id);
+    void convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, 
uint32_t src_begin,
+                                       uint32_t src_end, uint32_t dest_seg_id);
+    DeleteBitmapPtr get_converted_delete_bitmap() { return 
_converted_delete_bitmap; }
+
     io::FileWriterPtr& get_file_writer() { return _file_writer; }
 
     // set the cancel flag, tasks already started will not be cancelled.
@@ -78,6 +87,10 @@ private:
     BetaRowsetWriter* _writer = nullptr;
     io::FileWriterPtr _file_writer;
 
+    // for unique key mow table
+    std::unique_ptr<SimpleRowIdConversion> _rowid_conversion;
+    DeleteBitmapPtr _converted_delete_bitmap;
+
     // the state is not mutable when 1)actual compaction operation started or 
2) cancelled
     std::atomic<bool> _is_compacting_state_mutable = true;
 };
diff --git a/be/src/olap/simple_rowid_conversion.h 
b/be/src/olap/simple_rowid_conversion.h
new file mode 100644
index 00000000000..1a89b01838f
--- /dev/null
+++ b/be/src/olap/simple_rowid_conversion.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <map>
+#include <vector>
+
+#include "olap/olap_common.h"
+#include "olap/utils.h"
+
+namespace doris {
+
+// Simple verion of rowid conversion, for segcompaction
+// convert rows from several segments to rows in 1 segment
+class SimpleRowIdConversion {
+public:
+    SimpleRowIdConversion(const RowsetId& rowset_id) : _rowst_id(rowset_id) {};
+    ~SimpleRowIdConversion() = default;
+
+    // resize segment rowid map to its rows num
+    void reset_segment_map(const std::map<uint32_t, uint32_t>& num_rows) {
+        _cur_dst_segment_rowid = 0;
+        for (auto seg_rows : num_rows) {
+            _segments_rowid_map.emplace(seg_rows.first,
+                                        std::vector<uint32_t>(seg_rows.second, 
UINT32_MAX));
+        }
+    }
+
+    // add row id to the map
+    void add(const std::vector<RowLocation>& rss_row_ids) {
+        for (auto& item : rss_row_ids) {
+            if (item.row_id == -1) {
+                continue;
+            }
+            DCHECK(_segments_rowid_map.find(item.segment_id) != 
_segments_rowid_map.end() &&
+                   _segments_rowid_map[item.segment_id].size() > item.row_id);
+            _segments_rowid_map[item.segment_id][item.row_id] = 
_cur_dst_segment_rowid++;
+        }
+    }
+
+    // get destination RowLocation
+    // return non-zero if the src RowLocation does not exist
+    int get(const RowLocation& src) const {
+        auto it = _segments_rowid_map.find(src.segment_id);
+        if (it == _segments_rowid_map.end()) {
+            return -1;
+        }
+        const auto& rowid_map = it->second;
+        if (src.row_id >= rowid_map.size() || UINT32_MAX == 
rowid_map[src.row_id]) {
+            return -1;
+        }
+
+        return rowid_map[src.row_id];
+    }
+
+private:
+    // key:   index indicates src segment.
+    // value: index indicates row id of source segment, value indicates row id 
of destination
+    //        segment. UINT32_MAX indicates current row not exist.
+    std::map<uint32_t, std::vector<uint32_t>> _segments_rowid_map;
+
+    // dst rowset id
+    RowsetId _rowst_id;
+
+    // current rowid of dst segment
+    std::uint32_t _cur_dst_segment_rowid = 0;
+};
+
+} // namespace doris
diff --git a/be/src/vec/olap/vertical_block_reader.cpp 
b/be/src/vec/olap/vertical_block_reader.cpp
index 98efd767961..56cb3f9c1c9 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -133,6 +133,7 @@ Status VerticalBlockReader::_init_collect_iter(const 
ReaderParams& read_params,
         _reader_context.need_ordered_result = true; // TODO: should it be?
         _reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS;
         _reader_context.is_key_column_group = read_params.is_key_column_group;
+        _reader_context.record_rowids = read_params.record_rowids;
     }
 
     // build heap if key column iterator or build vertical merge iterator if 
value column
diff --git a/be/test/common/status_test.cpp b/be/test/common/status_test.cpp
index c1197dad0b1..e5477db1127 100644
--- a/be/test/common/status_test.cpp
+++ b/be/test/common/status_test.cpp
@@ -50,8 +50,6 @@ TEST_F(StatusTest, OK) {
 TEST_F(StatusTest, TStatusCodeWithStatus) {
     // The definition in status.h
     //extern ErrorCode::ErrorCodeState 
error_states[ErrorCode::MAX_ERROR_CODE_DEFINE_NUM];
-    extern ErrorCode::ErrorCodeState error_states;
-    extern ErrorCode::ErrorCodeInitializer error_code_init;
     // The definition in Status_types.h
     extern const std::map<int, const char*> _TStatusCode_VALUES_TO_NAMES;
     ErrorCode::error_code_init.check_init();
diff --git a/be/test/olap/segcompaction_mow_test.cpp 
b/be/test/olap/segcompaction_mow_test.cpp
new file mode 100644
index 00000000000..41e8ef74ed6
--- /dev/null
+++ b/be/test/olap/segcompaction_mow_test.cpp
@@ -0,0 +1,890 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include "common/config.h"
+#include "gen_cpp/AgentService_types.h"
+#include "gen_cpp/olap_file.pb.h"
+#include "io/fs/local_file_system.h"
+#include "olap/data_dir.h"
+#include "olap/row_cursor.h"
+#include "olap/rowset/beta_rowset_reader.h"
+#include "olap/rowset/beta_rowset_writer.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_reader_context.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/rowset_writer_context.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_meta.h"
+#include "olap/tablet_schema.h"
+#include "olap/utils.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker.h"
+#include "util/slice.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+static const uint32_t MAX_PATH_LEN = 1024;
+static const uint32_t TABLET_ID = 12345;
+static StorageEngine* s_engine;
+static const std::string lTestDir = "./data_test/data/segcompaction_mow_test";
+
+class SegCompactionMoWTest : public ::testing::TestWithParam<std::string> {
+public:
+    SegCompactionMoWTest() = default;
+
+    void SetUp() {
+        config::enable_segcompaction = true;
+        config::tablet_map_shard_size = 1;
+        config::txn_map_shard_size = 1;
+        config::txn_shard_size = 1;
+
+        char buffer[MAX_PATH_LEN];
+        EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
+        config::storage_root_path = std::string(buffer) + "/data_test";
+
+        auto st = 
io::global_local_filesystem()->delete_directory(config::storage_root_path);
+        ASSERT_TRUE(st.ok()) << st;
+        st = 
io::global_local_filesystem()->create_directory(config::storage_root_path);
+        ASSERT_TRUE(st.ok()) << st;
+
+        std::vector<StorePath> paths;
+        paths.emplace_back(config::storage_root_path, -1);
+
+        doris::EngineOptions options;
+        options.store_paths = paths;
+
+        auto engine = std::make_unique<StorageEngine>(options);
+        s_engine = engine.get();
+        ExecEnv::GetInstance()->set_storage_engine(std::move(engine));
+
+        Status s = s_engine->open();
+        EXPECT_TRUE(s.ok()) << s.to_string();
+
+        _data_dir = std::make_unique<DataDir>(*s_engine, lTestDir);
+        static_cast<void>(_data_dir->update_capacity());
+
+        
EXPECT_TRUE(io::global_local_filesystem()->create_directory(lTestDir).ok());
+
+        s = s_engine->start_bg_threads();
+        EXPECT_TRUE(s.ok()) << s.to_string();
+    }
+
+    void TearDown() { config::enable_segcompaction = false; }
+
+protected:
+    OlapReaderStatistics _stats;
+
+    bool check_dir(std::vector<std::string>& vec) {
+        std::vector<std::string> result;
+        for (const auto& entry : 
std::filesystem::directory_iterator(lTestDir)) {
+            result.push_back(std::filesystem::path(entry.path()).filename());
+        }
+
+        LOG(INFO) << "expected ls:" << std::endl;
+        for (auto& i : vec) {
+            LOG(INFO) << i;
+        }
+        LOG(INFO) << "acutal ls:" << std::endl;
+        for (auto& i : result) {
+            LOG(INFO) << i;
+        }
+
+        if (result.size() != vec.size()) {
+            return false;
+        } else {
+            for (auto& i : vec) {
+                if (std::find(result.begin(), result.end(), i) == 
result.end()) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    // (k1 int, k2 varchar(20), k3 int) keys (k1, k2)
+    void create_tablet_schema(TabletSchemaSPtr tablet_schema) {
+        TabletSchemaPB tablet_schema_pb;
+        tablet_schema_pb.set_keys_type(UNIQUE_KEYS);
+        tablet_schema_pb.set_num_short_key_columns(2);
+        tablet_schema_pb.set_num_rows_per_row_block(1024);
+        tablet_schema_pb.set_compress_kind(COMPRESS_NONE);
+        tablet_schema_pb.set_next_column_unique_id(5);
+        // add seq column so that segcompaction will process delete bitmap
+        tablet_schema_pb.set_sequence_col_idx(3);
+
+        ColumnPB* column_1 = tablet_schema_pb.add_column();
+        column_1->set_unique_id(1);
+        column_1->set_name("k1");
+        column_1->set_type("INT");
+        column_1->set_is_key(true);
+        column_1->set_length(4);
+        column_1->set_index_length(4);
+        column_1->set_is_nullable(true);
+        column_1->set_is_bf_column(false);
+
+        ColumnPB* column_2 = tablet_schema_pb.add_column();
+        column_2->set_unique_id(2);
+        column_2->set_name("k2");
+        column_2->set_type(
+                "INT"); // TODO change to varchar(20) when dict encoding for 
string is supported
+        column_2->set_length(4);
+        column_2->set_index_length(4);
+        column_2->set_is_nullable(true);
+        column_2->set_is_key(true);
+        column_2->set_is_nullable(true);
+        column_2->set_is_bf_column(false);
+
+        ColumnPB* v_column = tablet_schema_pb.add_column();
+        v_column->set_unique_id(3);
+        v_column->set_name(fmt::format("v1"));
+        v_column->set_type("INT");
+        v_column->set_length(4);
+        v_column->set_is_key(false);
+        v_column->set_is_nullable(false);
+        v_column->set_is_bf_column(false);
+        v_column->set_default_value(std::to_string(10));
+        v_column->set_aggregation("NONE");
+
+        ColumnPB* seq_column = tablet_schema_pb.add_column();
+        seq_column->set_unique_id(4);
+        seq_column->set_name(SEQUENCE_COL);
+        seq_column->set_type("INT");
+        seq_column->set_length(4);
+        seq_column->set_is_key(false);
+        seq_column->set_is_nullable(false);
+        seq_column->set_is_bf_column(false);
+        seq_column->set_default_value(std::to_string(10));
+        seq_column->set_aggregation("NONE");
+
+        tablet_schema->init_from_pb(tablet_schema_pb);
+    }
+
+    // use different id to avoid conflict
+    void create_rowset_writer_context(int64_t id, TabletSchemaSPtr 
tablet_schema,
+                                      RowsetWriterContext* 
rowset_writer_context) {
+        RowsetId rowset_id;
+        rowset_id.init(id);
+        // rowset_writer_context->data_dir = _data_dir.get();
+        rowset_writer_context->rowset_id = rowset_id;
+        rowset_writer_context->tablet_id = TABLET_ID;
+        rowset_writer_context->tablet_schema_hash = 1111;
+        rowset_writer_context->partition_id = 10;
+        rowset_writer_context->rowset_type = BETA_ROWSET;
+        rowset_writer_context->tablet_path = lTestDir;
+        rowset_writer_context->rowset_state = VISIBLE;
+        rowset_writer_context->tablet_schema = tablet_schema;
+        rowset_writer_context->version.first = 10;
+        rowset_writer_context->version.second = 10;
+
+        TabletMetaSharedPtr tablet_meta = std::make_shared<TabletMeta>();
+        tablet_meta->_tablet_id = TABLET_ID;
+        static_cast<void>(tablet_meta->set_partition_id(10000));
+        tablet_meta->_schema = tablet_schema;
+        tablet_meta->_enable_unique_key_merge_on_write = true;
+        auto tablet = std::make_shared<Tablet>(*s_engine, tablet_meta, 
_data_dir.get(), "test_str");
+        // tablet->key
+        rowset_writer_context->tablet = tablet;
+    }
+
+    void create_and_init_rowset_reader(Rowset* rowset, RowsetReaderContext& 
context,
+                                       RowsetReaderSharedPtr* result) {
+        auto s = rowset->create_reader(result);
+        EXPECT_EQ(Status::OK(), s);
+        EXPECT_TRUE(*result != nullptr);
+
+        s = (*result)->init(&context);
+        EXPECT_EQ(Status::OK(), s);
+    }
+
+    bool check_data_read_with_delete_bitmap(TabletSchemaSPtr tablet_schema,
+                                            DeleteBitmapPtr delete_bitmap, 
RowsetSharedPtr rowset,
+                                            int expect_total_rows, int 
rows_mark_deleted) {
+        RowsetReaderContext reader_context;
+        reader_context.tablet_schema = tablet_schema;
+        // use this type to avoid cache from other ut
+        reader_context.reader_type = ReaderType::READER_QUERY;
+        reader_context.need_ordered_result = true;
+        std::vector<uint32_t> return_columns = {0, 1, 2};
+        reader_context.return_columns = &return_columns;
+        reader_context.stats = &_stats;
+        reader_context.delete_bitmap = delete_bitmap.get();
+
+        std::vector<uint32_t> segment_num_rows;
+        Status s;
+
+        // without predicates
+        {
+            RowsetReaderSharedPtr rowset_reader;
+            create_and_init_rowset_reader(rowset.get(), reader_context, 
&rowset_reader);
+
+            uint32_t num_rows_read = 0;
+            bool eof = false;
+            while (!eof) {
+                std::shared_ptr<vectorized::Block> output_block =
+                        std::make_shared<vectorized::Block>(
+                                tablet_schema->create_block(return_columns));
+                s = rowset_reader->next_block(output_block.get());
+                if (s != Status::OK()) {
+                    eof = true;
+                }
+                EXPECT_EQ(return_columns.size(), output_block->columns());
+                for (int i = 0; i < output_block->rows(); ++i) {
+                    vectorized::ColumnPtr col0 = 
output_block->get_by_position(0).column;
+                    vectorized::ColumnPtr col1 = 
output_block->get_by_position(1).column;
+                    vectorized::ColumnPtr col2 = 
output_block->get_by_position(2).column;
+                    auto field1 = (*col0)[i];
+                    auto field2 = (*col1)[i];
+                    auto field3 = (*col2)[i];
+                    uint32_t k1 = 
*reinterpret_cast<uint32_t*>((char*)(&field1));
+                    uint32_t k2 = 
*reinterpret_cast<uint32_t*>((char*)(&field2));
+                    uint32_t v3 = 
*reinterpret_cast<uint32_t*>((char*)(&field3));
+                    EXPECT_EQ(100 * v3 + k2, k1);
+                    EXPECT_TRUE(v3 % 3 != 0); // all v3%3==0 is deleted
+                    num_rows_read++;
+                }
+                output_block->clear();
+            }
+            EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
+            EXPECT_EQ(rowset->rowset_meta()->num_rows(), expect_total_rows);
+            EXPECT_EQ(num_rows_read, expect_total_rows - rows_mark_deleted);
+            
EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok());
+            size_t total_num_rows = 0;
+            for (const auto& i : segment_num_rows) {
+                total_num_rows += i;
+            }
+            EXPECT_EQ(total_num_rows, expect_total_rows);
+        }
+        return true;
+    }
+
+private:
+    std::unique_ptr<DataDir> _data_dir;
+};
+
+TEST_P(SegCompactionMoWTest, SegCompactionThenRead) {
+    std::string delete_ratio = GetParam();
+    config::enable_segcompaction = true;
+    Status s;
+    TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+    create_tablet_schema(tablet_schema);
+
+    RowsetSharedPtr rowset;
+    const int num_segments = 15;
+    const uint32_t rows_per_segment = 4096;
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+                                                     // rows_per_segment
+    config::segcompaction_batch_size = 10;
+    std::vector<uint32_t> segment_num_rows;
+    DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
+    uint32_t rows_mark_deleted = 0;
+    { // write `num_segments * rows_per_segment` rows to rowset
+        RowsetWriterContext writer_context;
+        int raw_rsid = rand();
+        create_rowset_writer_context(raw_rsid, tablet_schema, &writer_context);
+        RowsetIdUnorderedSet rsids;
+        std::vector<RowsetSharedPtr> rowset_ptrs;
+        writer_context.mow_context =
+                std::make_shared<MowContext>(1, 1, rsids, rowset_ptrs, 
delete_bitmap);
+        auto rowset_id = writer_context.rowset_id;
+
+        auto res = RowsetFactory::create_rowset_writer(*s_engine, 
writer_context, false);
+        EXPECT_TRUE(res.has_value()) << res.error();
+        auto rowset_writer = std::move(res).value();
+        EXPECT_EQ(Status::OK(), s);
+        // for segment "i", row "rid"
+        // k1 := rid*10 + i
+        // k2 := k1 * 10
+        // k3 := rid
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + i;
+                uint32_t k2 = i;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                if (delete_ratio == "full") { // delete all data
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, i, DeleteBitmap::TEMP_VERSION_COMMON}, 
rid);
+                    rows_mark_deleted++;
+                } else {
+                    // mark delete every 3 rows
+                    if (rid % 3 == 0) {
+                        writer_context.mow_context->delete_bitmap->add(
+                                {rowset_id, i, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                        rows_mark_deleted++;
+                    }
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            sleep(1);
+        }
+
+        size_t total_cardinality1 = 0;
+        for (auto entry : delete_bitmap->delete_bitmap) {
+            total_cardinality1 += entry.second.cardinality();
+        }
+        EXPECT_EQ(num_segments, delete_bitmap->delete_bitmap.size());
+        EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
+        std::vector<std::string> ls;
+        ls.push_back(fmt::format("{}_0.dat", raw_rsid));
+        ls.push_back(fmt::format("{}_1.dat", raw_rsid));
+        ls.push_back(fmt::format("{}_2.dat", raw_rsid));
+        ls.push_back(fmt::format("{}_3.dat", raw_rsid));
+        ls.push_back(fmt::format("{}_4.dat", raw_rsid));
+        ls.push_back(fmt::format("{}_5.dat", raw_rsid));
+        ls.push_back(fmt::format("{}_6.dat", raw_rsid));
+        EXPECT_TRUE(check_dir(ls));
+        // 7 segments plus 1 sentinel mark
+        size_t total_cardinality2 = 0;
+        for (auto entry : delete_bitmap->delete_bitmap) {
+            if (std::get<1>(entry.first) == DeleteBitmap::INVALID_SEGMENT_ID) {
+                continue;
+            }
+            total_cardinality2 += entry.second.cardinality();
+        }
+        // 7 segments + 1 sentinel mark
+        EXPECT_EQ(8, delete_bitmap->delete_bitmap.size());
+        EXPECT_EQ(total_cardinality1, total_cardinality2);
+    }
+
+    EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, 
delete_bitmap, rowset,
+                                                   num_segments * 
rows_per_segment,
+                                                   rows_mark_deleted));
+}
+
+TEST_F(SegCompactionMoWTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) 
{
+    config::enable_segcompaction = true;
+    Status s;
+    TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+    create_tablet_schema(tablet_schema);
+
+    RowsetSharedPtr rowset;
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+                                                     // rows_per_segment
+    DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
+    uint32_t rows_mark_deleted = 0;
+    uint32_t total_written_rows = 0;
+    std::vector<uint32_t> segment_num_rows;
+    { // write `num_segments * rows_per_segment` rows to rowset
+        RowsetWriterContext writer_context;
+        create_rowset_writer_context(20048, tablet_schema, &writer_context);
+        RowsetIdUnorderedSet rsids;
+        std::vector<RowsetSharedPtr> rowset_ptrs;
+        writer_context.mow_context =
+                std::make_shared<MowContext>(1, 1, rsids, rowset_ptrs, 
delete_bitmap);
+        auto rowset_id = writer_context.rowset_id;
+
+        auto res = RowsetFactory::create_rowset_writer(*s_engine, 
writer_context, false);
+        EXPECT_TRUE(res.has_value()) << res.error();
+        auto rowset_writer = std::move(res).value();
+        EXPECT_EQ(Status::OK(), s);
+
+        // for segment "i", row "rid"
+        // k1 := rid*10 + i
+        // k2 := k1 * 10
+        // k3 := 4096 * i + rid
+        int num_segments = 4;
+        uint32_t rows_per_segment = 4096;
+        int segid = 0;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + i;
+                uint32_t k2 = i;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows
+                if (rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+        num_segments = 2;
+        rows_per_segment = 6400;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + i;
+                uint32_t k2 = i;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows
+                if (rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+        num_segments = 1;
+        rows_per_segment = 4096;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + i;
+                uint32_t k2 = i;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows
+                if (rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+        num_segments = 1;
+        rows_per_segment = 6400;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + i;
+                uint32_t k2 = i;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows
+                if (rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+        num_segments = 8;
+        rows_per_segment = 4096;
+        std::map<uint32_t, uint32_t> unique_keys;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                // generate some duplicate rows, segment compaction will merge 
them
+                int rand_i = rand() % (num_segments - 3);
+                uint32_t k1 = rid * 100 + rand_i;
+                uint32_t k2 = rand_i;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows
+                if (rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                }
+                unique_keys.emplace(k1, rid);
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            sleep(1);
+            segid++;
+        }
+        // these 8 segments should be compacted to 1 segment finally
+        // so the finally written rows should be the unique rows after 
compaction
+        total_written_rows += unique_keys.size();
+        for (auto entry : unique_keys) {
+            if (entry.second % 3 == 0) {
+                rows_mark_deleted++;
+            }
+        }
+
+        num_segments = 1;
+        rows_per_segment = 6400;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + i;
+                uint32_t k2 = i;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows
+                if (rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            sleep(1);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+
+        EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
+        std::vector<std::string> ls;
+        // ooooOOoOooooooooO
+        ls.push_back("20048_0.dat"); // oooo
+        ls.push_back("20048_1.dat"); // O
+        ls.push_back("20048_2.dat"); // O
+        ls.push_back("20048_3.dat"); // o
+        ls.push_back("20048_4.dat"); // O
+        ls.push_back("20048_5.dat"); // oooooooo
+        ls.push_back("20048_6.dat"); // O
+        EXPECT_TRUE(check_dir(ls));
+        // 7 segments + 1 sentinel mark
+        EXPECT_EQ(8, delete_bitmap->delete_bitmap.size());
+    }
+    EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, 
delete_bitmap, rowset,
+                                                   total_written_rows, 
rows_mark_deleted));
+}
+
+TEST_F(SegCompactionMoWTest, SegCompactionInterleaveWithBig_OoOoO) {
+    config::enable_segcompaction = true;
+    Status s;
+    TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+    create_tablet_schema(tablet_schema);
+
+    RowsetSharedPtr rowset;
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+    config::segcompaction_batch_size = 5;
+    std::vector<uint32_t> segment_num_rows;
+    DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
+    uint32_t rows_mark_deleted = 0;
+    uint32_t total_written_rows = 0;
+    { // write `num_segments * rows_per_segment` rows to rowset
+        RowsetWriterContext writer_context;
+        create_rowset_writer_context(20049, tablet_schema, &writer_context);
+        RowsetIdUnorderedSet rsids;
+        std::vector<RowsetSharedPtr> rowset_ptrs;
+        writer_context.mow_context =
+                std::make_shared<MowContext>(1, 1, rsids, rowset_ptrs, 
delete_bitmap);
+        auto rowset_id = writer_context.rowset_id;
+
+        auto res = RowsetFactory::create_rowset_writer(*s_engine, 
writer_context, false);
+        EXPECT_TRUE(res.has_value()) << res.error();
+        auto rowset_writer = std::move(res).value();
+        EXPECT_EQ(Status::OK(), s);
+
+        // for segment "i", row "rid"
+        // k1 := rid*10 + i
+        // k2 := k1 * 10
+        // k3 := 4096 * i + rid
+        int num_segments = 1;
+        uint32_t rows_per_segment = 6400;
+        int segid = 0;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + i;
+                uint32_t k2 = i;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows
+                if (rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+        num_segments = 1;
+        rows_per_segment = 4096;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + i;
+                uint32_t k2 = i;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows
+                if (rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+        num_segments = 1;
+        rows_per_segment = 6400;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + i;
+                uint32_t k2 = i;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows
+                if (rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+        num_segments = 1;
+        rows_per_segment = 4096;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + i;
+                uint32_t k2 = i;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows
+                if (rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+        num_segments = 1;
+        rows_per_segment = 6400;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + i;
+                uint32_t k2 = i;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows
+                if (rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            sleep(1);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+
+        EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
+        std::vector<std::string> ls;
+        ls.push_back("20049_0.dat"); // O
+        ls.push_back("20049_1.dat"); // o
+        ls.push_back("20049_2.dat"); // O
+        ls.push_back("20049_3.dat"); // o
+        ls.push_back("20049_4.dat"); // O
+        EXPECT_TRUE(check_dir(ls));
+    }
+
+    EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, 
delete_bitmap, rowset,
+                                                   total_written_rows, 
rows_mark_deleted));
+}
+
+TEST_F(SegCompactionMoWTest, SegCompactionNotTrigger) {
+    config::enable_segcompaction = true;
+    Status s;
+    TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+    create_tablet_schema(tablet_schema);
+
+    RowsetSharedPtr rowset;
+    const int num_segments = 8;
+    const uint32_t rows_per_segment = 4096;
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+                                                     // rows_per_segment
+    config::segcompaction_batch_size = 10;
+    std::vector<uint32_t> segment_num_rows;
+    DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
+    uint32_t rows_mark_deleted = 0;
+    { // write `num_segments * rows_per_segment` rows to rowset
+        RowsetWriterContext writer_context;
+        create_rowset_writer_context(20050, tablet_schema, &writer_context);
+        RowsetIdUnorderedSet rsids;
+        std::vector<RowsetSharedPtr> rowset_ptrs;
+        writer_context.mow_context =
+                std::make_shared<MowContext>(1, 1, rsids, rowset_ptrs, 
delete_bitmap);
+        auto rowset_id = writer_context.rowset_id;
+
+        auto res = RowsetFactory::create_rowset_writer(*s_engine, 
writer_context, false);
+        EXPECT_TRUE(res.has_value()) << res.error();
+        auto rowset_writer = std::move(res).value();
+        EXPECT_EQ(Status::OK(), s);
+        // for segment "i", row "rid"
+        // k1 := rid*10 + i
+        // k2 := k1 * 10
+        // k3 := rid
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + i;
+                uint32_t k2 = i;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows
+                if (rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, i, DeleteBitmap::TEMP_VERSION_COMMON}, 
rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            sleep(1);
+        }
+
+        EXPECT_EQ(num_segments, delete_bitmap->delete_bitmap.size());
+        EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
+        std::vector<std::string> ls;
+        ls.push_back("20050_0.dat");
+        ls.push_back("20050_1.dat");
+        ls.push_back("20050_2.dat");
+        ls.push_back("20050_3.dat");
+        ls.push_back("20050_4.dat");
+        ls.push_back("20050_5.dat");
+        ls.push_back("20050_6.dat");
+        ls.push_back("20050_7.dat");
+        EXPECT_TRUE(check_dir(ls));
+        EXPECT_EQ(num_segments, delete_bitmap->delete_bitmap.size());
+
+        
EXPECT_FALSE(static_cast<BetaRowsetWriter*>(rowset_writer.get())->is_segcompacted());
+    }
+
+    EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, 
delete_bitmap, rowset,
+                                                   num_segments * 
rows_per_segment,
+                                                   rows_mark_deleted));
+}
+
+INSTANTIATE_TEST_SUITE_P(Params, SegCompactionMoWTest,
+                         ::testing::ValuesIn(std::vector<std::string> 
{"partial", "full"}));
+
+} // namespace doris
+
+// @brief Test Stub


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to