This is an automated email from the ASF dual-hosted git repository.

zhangchen pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new c31344667ca [fix](merge-on-write) segcompaction should process delete 
bitmap if necessary (#38369) (#39749)
c31344667ca is described below

commit c31344667caf98b1960df64ec5450f36f7ea4884
Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com>
AuthorDate: Thu Aug 22 14:22:54 2024 +0800

    [fix](merge-on-write) segcompaction should process delete bitmap if 
necessary (#38369) (#39749)
    
    cherry-pick #38369 and #38800
---
 be/src/olap/merger.cpp                    |  19 +-
 be/src/olap/merger.h                      |   6 +-
 be/src/olap/rowset/beta_rowset_writer.cpp |  35 +-
 be/src/olap/rowset/beta_rowset_writer.h   |   3 +-
 be/src/olap/rowset/segcompaction.cpp      |  72 ++-
 be/src/olap/rowset/segcompaction.h        |  13 +
 be/src/olap/simple_rowid_conversion.h     |  84 +++
 be/src/vec/olap/vertical_block_reader.cpp |   1 +
 be/test/olap/segcompaction_mow_test.cpp   | 907 ++++++++++++++++++++++++++++++
 9 files changed, 1123 insertions(+), 17 deletions(-)

diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index 6bcdb2206c5..164b38dcb7d 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -283,16 +283,12 @@ Status Merger::vertical_compact_one_group(
 }
 
 // for segcompaction
-Status Merger::vertical_compact_one_group(TabletSharedPtr tablet, ReaderType 
reader_type,
-                                          TabletSchemaSPtr tablet_schema, bool 
is_key,
-                                          const std::vector<uint32_t>& 
column_group,
-                                          vectorized::RowSourcesBuffer* 
row_source_buf,
-                                          vectorized::VerticalBlockReader& 
src_block_reader,
-                                          segment_v2::SegmentWriter& 
dst_segment_writer,
-                                          int64_t max_rows_per_segment, 
Statistics* stats_output,
-                                          uint64_t* index_size, KeyBoundsPB& 
key_bounds) {
-    // build tablet reader
-    VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << 
max_rows_per_segment;
+Status Merger::vertical_compact_one_group(
+        TabletSharedPtr tablet, ReaderType reader_type, TabletSchemaSPtr 
tablet_schema, bool is_key,
+        const std::vector<uint32_t>& column_group, 
vectorized::RowSourcesBuffer* row_source_buf,
+        vectorized::VerticalBlockReader& src_block_reader,
+        segment_v2::SegmentWriter& dst_segment_writer, Statistics* 
stats_output,
+        uint64_t* index_size, KeyBoundsPB& key_bounds, SimpleRowIdConversion* 
rowid_conversion) {
     // TODO: record_rowids
     vectorized::Block block = tablet_schema->create_block(column_group);
     size_t output_rows = 0;
@@ -309,6 +305,9 @@ Status Merger::vertical_compact_one_group(TabletSharedPtr 
tablet, ReaderType rea
                 dst_segment_writer.append_block(&block, 0, block.rows()),
                 "failed to write block when merging rowsets of tablet " + 
tablet->full_name());
 
+        if (is_key && rowid_conversion != nullptr) {
+            
rowid_conversion->add(src_block_reader.current_block_row_locations());
+        }
         output_rows += block.rows();
         block.clear_column_data();
     }
diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h
index 37291c548c3..6221dabc8b2 100644
--- a/be/src/olap/merger.h
+++ b/be/src/olap/merger.h
@@ -24,6 +24,7 @@
 #include "common/status.h"
 #include "io/io_common.h"
 #include "olap/rowset/rowset_reader.h"
+#include "olap/simple_rowid_conversion.h"
 #include "olap/tablet.h"
 #include "olap/tablet_schema.h"
 
@@ -84,8 +85,9 @@ public:
                                              vectorized::RowSourcesBuffer* 
row_source_buf,
                                              vectorized::VerticalBlockReader& 
src_block_reader,
                                              segment_v2::SegmentWriter& 
dst_segment_writer,
-                                             int64_t max_rows_per_segment, 
Statistics* stats_output,
-                                             uint64_t* index_size, 
KeyBoundsPB& key_bounds);
+                                             Statistics* stats_output, 
uint64_t* index_size,
+                                             KeyBoundsPB& key_bounds,
+                                             SimpleRowIdConversion* 
rowid_conversion);
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 57e45b6f83b..31e3e628365 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -191,7 +191,12 @@ Status 
BetaRowsetWriter::_find_longest_consecutive_small_segment(
         if (is_large_segment) {
             if (segid == _segcompacted_point) {
                 // skip large segments at the front
+                auto dst_seg_id = _num_segcompacted.load();
                 
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
+                if (_segcompaction_worker.need_convert_delete_bitmap()) {
+                    _segcompaction_worker.convert_segment_delete_bitmap(
+                            _context.mow_context->delete_bitmap, segid, 
dst_seg_id);
+                }
                 continue;
             } else {
                 // stop because we need consecutive segments
@@ -216,7 +221,13 @@ Status 
BetaRowsetWriter::_find_longest_consecutive_small_segment(
     }
     if (s == 1) { // poor bachelor, let it go
         VLOG_DEBUG << "only one candidate segment";
+        auto src_seg_id = _segcompacted_point.load();
+        auto dst_seg_id = _num_segcompacted.load();
         
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
+        if (_segcompaction_worker.need_convert_delete_bitmap()) {
+            
_segcompaction_worker.convert_segment_delete_bitmap(_context.mow_context->delete_bitmap,
+                                                                src_seg_id, 
dst_seg_id);
+        }
         segments->clear();
         return Status::OK();
     }
@@ -374,7 +385,7 @@ Status 
BetaRowsetWriter::_segcompaction_rename_last_segments() {
         return Status::Error<SEGCOMPACTION_FAILED>(
                 "BetaRowsetWriter::_segcompaction_rename_last_segments meet 
invalid state");
     }
-    if (!_is_segcompacted() || _segcompacted_point == _num_segment) {
+    if (!is_segcompacted() || _segcompacted_point == _num_segment) {
         // no need if never segcompact before or all segcompacted
         return Status::OK();
     }
@@ -382,7 +393,12 @@ Status 
BetaRowsetWriter::_segcompaction_rename_last_segments() {
     // so that transaction can be committed ASAP
     VLOG_DEBUG << "segcompaction last few segments";
     for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) {
+        auto dst_segid = _num_segcompacted.load();
         
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
+        if (_segcompaction_worker.need_convert_delete_bitmap()) {
+            
_segcompaction_worker.convert_segment_delete_bitmap(_context.mow_context->delete_bitmap,
+                                                                segid, 
dst_segid);
+        }
     }
     return Status::OK();
 }
@@ -543,6 +559,21 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
             
RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_worker.get_file_writer()->close(),
                                            "close segment compaction worker 
failed");
         }
+        // process delete bitmap for mow table
+        if (is_segcompacted() && 
_segcompaction_worker.need_convert_delete_bitmap()) {
+            auto converted_delete_bitmap = 
_segcompaction_worker.get_converted_delete_bitmap();
+            // which means the segment compaction is triggerd
+            if (converted_delete_bitmap != nullptr) {
+                RowsetIdUnorderedSet rowsetids;
+                rowsetids.insert(rowset_id());
+                auto tablet = static_cast<Tablet*>(_context.tablet.get());
+                
tablet->add_sentinel_mark_to_delete_bitmap(converted_delete_bitmap.get(),
+                                                           rowsetids);
+                _context.mow_context->delete_bitmap->remove({rowset_id(), 0, 
0},
+                                                            {rowset_id(), 
UINT32_MAX, INT64_MAX});
+                
_context.mow_context->delete_bitmap->merge(*converted_delete_bitmap);
+            }
+        }
     }
     // When building a rowset, we must ensure that the current _segment_writer 
has been
     // flushed, that is, the current _segment_writer is nullptr
@@ -612,7 +643,7 @@ void BetaRowsetWriter::_build_rowset_meta_with_spec_field(
 
 Status BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> 
rowset_meta,
                                             bool check_segment_num) {
-    int64_t num_seg = _is_segcompacted() ? _num_segcompacted : _num_segment;
+    int64_t num_seg = is_segcompacted() ? _num_segcompacted : _num_segment;
     int64_t num_rows_written = 0;
     int64_t total_data_size = 0;
     int64_t total_index_size = 0;
diff --git a/be/src/olap/rowset/beta_rowset_writer.h 
b/be/src/olap/rowset/beta_rowset_writer.h
index 38f9eb7923d..90135b2f0fc 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -134,6 +134,8 @@ public:
         return _context.partial_update_info && 
_context.partial_update_info->is_partial_update;
     }
 
+    bool is_segcompacted() { return (_num_segcompacted > 0) ? true : false; }
+
 private:
     Status _do_add_block(const vectorized::Block* block,
                          std::unique_ptr<segment_v2::SegmentWriter>* 
segment_writer,
@@ -155,7 +157,6 @@ private:
     Status _segcompaction_rename_last_segments();
     Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, 
int32_t segment_id);
     Status 
_find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr& 
segments);
-    bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; }
 
     bool _check_and_set_is_doing_segcompaction();
 
diff --git a/be/src/olap/rowset/segcompaction.cpp 
b/be/src/olap/rowset/segcompaction.cpp
index 598a78326ed..d134f87e8b9 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -75,11 +75,14 @@ Status SegcompactionWorker::_get_segcompaction_reader(
         std::vector<uint32_t>& return_columns,
         std::unique_ptr<vectorized::VerticalBlockReader>* reader) {
     auto ctx = _writer->_context;
+    bool record_rowids = need_convert_delete_bitmap() && is_key;
     StorageReadOptions read_options;
     read_options.stats = stat;
     read_options.use_page_cache = false;
     read_options.tablet_schema = ctx.tablet_schema;
+    read_options.record_rowids = record_rowids;
     std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+    std::map<uint32_t, uint32_t> segment_rows;
     for (auto& seg_ptr : *segments) {
         std::unique_ptr<RowwiseIterator> iter;
         auto s = seg_ptr->new_iterator(schema, read_options, &iter);
@@ -88,6 +91,10 @@ Status SegcompactionWorker::_get_segcompaction_reader(
                                               s.to_string());
         }
         seg_iterators.push_back(std::move(iter));
+        segment_rows.emplace(seg_ptr->id(), seg_ptr->num_rows());
+    }
+    if (record_rowids && _rowid_conversion != nullptr) {
+        _rowid_conversion->reset_segment_map(segment_rows);
     }
 
     *reader = std::unique_ptr<vectorized::VerticalBlockReader> {
@@ -102,6 +109,7 @@ Status SegcompactionWorker::_get_segcompaction_reader(
     reader_params.return_columns = return_columns;
     reader_params.is_key_column_group = is_key;
     reader_params.use_page_cache = false;
+    reader_params.record_rowids = record_rowids;
     return (*reader)->init(reader_params);
 }
 
@@ -219,6 +227,9 @@ Status 
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
 
     DCHECK(ctx.tablet);
     auto tablet = ctx.tablet;
+    if (need_convert_delete_bitmap() && _rowid_conversion == nullptr) {
+        _rowid_conversion = 
std::make_unique<SimpleRowIdConversion>(_writer->rowset_id());
+    }
 
     std::vector<std::vector<uint32_t>> column_groups;
     Merger::vertical_split_columns(ctx.tablet_schema, &column_groups);
@@ -248,8 +259,8 @@ Status 
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
         Merger::Statistics merger_stats;
         RETURN_IF_ERROR(Merger::vertical_compact_one_group(
                 tablet, ReaderType::READER_SEGMENT_COMPACTION, 
ctx.tablet_schema, is_key,
-                column_ids, &row_sources_buf, *reader, *writer, INT_MAX, 
&merger_stats, &index_size,
-                key_bounds));
+                column_ids, &row_sources_buf, *reader, *writer, &merger_stats, 
&index_size,
+                key_bounds, _rowid_conversion.get()));
         total_index_size += index_size;
         if (is_key) {
             row_sources_buf.flush();
@@ -275,6 +286,10 @@ Status 
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
     }
 
     RETURN_IF_ERROR(_delete_original_segments(begin, end));
+    if (_rowid_conversion != nullptr) {
+        convert_segment_delete_bitmap(ctx.mow_context->delete_bitmap, begin, 
end,
+                                      _writer->_num_segcompacted);
+    }
     RETURN_IF_ERROR(_writer->_rename_compacted_segments(begin, end));
 
     if (VLOG_DEBUG_IS_ON) {
@@ -332,4 +347,57 @@ void 
SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segm
     }
 }
 
+bool SegcompactionWorker::need_convert_delete_bitmap() {
+    if (_writer == nullptr) {
+        return false;
+    }
+    auto tablet = _writer->_context.tablet;
+    return tablet != nullptr && tablet->keys_type() == KeysType::UNIQUE_KEYS &&
+           tablet->enable_unique_key_merge_on_write() &&
+           tablet->tablet_schema()->has_sequence_col();
+}
+
+void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr 
src_delete_bitmap,
+                                                        uint32_t src_seg_id, 
uint32_t dest_seg_id) {
+    // lazy init
+    if (nullptr == _converted_delete_bitmap) {
+        _converted_delete_bitmap = 
std::make_shared<DeleteBitmap>(_writer->_context.tablet_id);
+    }
+    auto rowset_id = _writer->_context.rowset_id;
+    const auto* seg_map =
+            src_delete_bitmap->get({rowset_id, src_seg_id, 
DeleteBitmap::TEMP_VERSION_COMMON});
+    if (seg_map != nullptr) {
+        _converted_delete_bitmap->set({rowset_id, dest_seg_id, 
DeleteBitmap::TEMP_VERSION_COMMON},
+                                      *seg_map);
+    }
+}
+
+void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr 
src_delete_bitmap,
+                                                        uint32_t src_begin, 
uint32_t src_end,
+                                                        uint32_t dst_seg_id) {
+    // lazy init
+    if (nullptr == _converted_delete_bitmap) {
+        _converted_delete_bitmap = 
std::make_shared<DeleteBitmap>(_writer->_context.tablet_id);
+    }
+    auto rowset_id = _writer->_context.rowset_id;
+    RowLocation src(rowset_id, 0, 0);
+    for (uint32_t seg_id = src_begin; seg_id <= src_end; seg_id++) {
+        const auto* seg_map =
+                src_delete_bitmap->get({rowset_id, seg_id, 
DeleteBitmap::TEMP_VERSION_COMMON});
+        if (!seg_map) {
+            continue;
+        }
+        src.segment_id = seg_id;
+        for (unsigned int row_id : *seg_map) {
+            src.row_id = row_id;
+            auto dst_row_id = _rowid_conversion->get(src);
+            if (dst_row_id < 0) {
+                continue;
+            }
+            _converted_delete_bitmap->add(
+                    {rowset_id, dst_seg_id, 
DeleteBitmap::TEMP_VERSION_COMMON}, dst_row_id);
+        }
+    }
+}
+
 } // namespace doris
diff --git a/be/src/olap/rowset/segcompaction.h 
b/be/src/olap/rowset/segcompaction.h
index 273fbdec560..5f832f664e4 100644
--- a/be/src/olap/rowset/segcompaction.h
+++ b/be/src/olap/rowset/segcompaction.h
@@ -24,6 +24,7 @@
 #include "common/status.h"
 #include "io/fs/file_reader_writer_fwd.h"
 #include "olap/merger.h"
+#include "olap/simple_rowid_conversion.h"
 #include "olap/tablet.h"
 #include "segment_v2/segment.h"
 
@@ -52,6 +53,14 @@ public:
 
     void compact_segments(SegCompactionCandidatesSharedPtr segments);
 
+    bool need_convert_delete_bitmap();
+
+    void convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, 
uint32_t src_seg_id,
+                                       uint32_t dest_seg_id);
+    void convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, 
uint32_t src_begin,
+                                       uint32_t src_end, uint32_t dest_seg_id);
+    DeleteBitmapPtr get_converted_delete_bitmap() { return 
_converted_delete_bitmap; }
+
     io::FileWriterPtr& get_file_writer() { return _file_writer; }
 
     // set the cancel flag, tasks already started will not be cancelled.
@@ -78,5 +87,9 @@ private:
     BetaRowsetWriter* _writer;
     io::FileWriterPtr _file_writer;
     std::atomic<bool> _cancelled = false;
+
+    // for unique key mow table
+    std::unique_ptr<SimpleRowIdConversion> _rowid_conversion;
+    DeleteBitmapPtr _converted_delete_bitmap;
 };
 } // namespace doris
diff --git a/be/src/olap/simple_rowid_conversion.h 
b/be/src/olap/simple_rowid_conversion.h
new file mode 100644
index 00000000000..1a89b01838f
--- /dev/null
+++ b/be/src/olap/simple_rowid_conversion.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <map>
+#include <vector>
+
+#include "olap/olap_common.h"
+#include "olap/utils.h"
+
+namespace doris {
+
+// Simple verion of rowid conversion, for segcompaction
+// convert rows from several segments to rows in 1 segment
+class SimpleRowIdConversion {
+public:
+    SimpleRowIdConversion(const RowsetId& rowset_id) : _rowst_id(rowset_id) {};
+    ~SimpleRowIdConversion() = default;
+
+    // resize segment rowid map to its rows num
+    void reset_segment_map(const std::map<uint32_t, uint32_t>& num_rows) {
+        _cur_dst_segment_rowid = 0;
+        for (auto seg_rows : num_rows) {
+            _segments_rowid_map.emplace(seg_rows.first,
+                                        std::vector<uint32_t>(seg_rows.second, 
UINT32_MAX));
+        }
+    }
+
+    // add row id to the map
+    void add(const std::vector<RowLocation>& rss_row_ids) {
+        for (auto& item : rss_row_ids) {
+            if (item.row_id == -1) {
+                continue;
+            }
+            DCHECK(_segments_rowid_map.find(item.segment_id) != 
_segments_rowid_map.end() &&
+                   _segments_rowid_map[item.segment_id].size() > item.row_id);
+            _segments_rowid_map[item.segment_id][item.row_id] = 
_cur_dst_segment_rowid++;
+        }
+    }
+
+    // get destination RowLocation
+    // return non-zero if the src RowLocation does not exist
+    int get(const RowLocation& src) const {
+        auto it = _segments_rowid_map.find(src.segment_id);
+        if (it == _segments_rowid_map.end()) {
+            return -1;
+        }
+        const auto& rowid_map = it->second;
+        if (src.row_id >= rowid_map.size() || UINT32_MAX == 
rowid_map[src.row_id]) {
+            return -1;
+        }
+
+        return rowid_map[src.row_id];
+    }
+
+private:
+    // key:   index indicates src segment.
+    // value: index indicates row id of source segment, value indicates row id 
of destination
+    //        segment. UINT32_MAX indicates current row not exist.
+    std::map<uint32_t, std::vector<uint32_t>> _segments_rowid_map;
+
+    // dst rowset id
+    RowsetId _rowst_id;
+
+    // current rowid of dst segment
+    std::uint32_t _cur_dst_segment_rowid = 0;
+};
+
+} // namespace doris
diff --git a/be/src/vec/olap/vertical_block_reader.cpp 
b/be/src/vec/olap/vertical_block_reader.cpp
index f1f4f3a3895..ee2f52462f8 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -120,6 +120,7 @@ Status VerticalBlockReader::_init_collect_iter(const 
ReaderParams& read_params)
         _reader_context.need_ordered_result = true; // TODO: should it be?
         _reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS;
         _reader_context.is_key_column_group = read_params.is_key_column_group;
+        _reader_context.record_rowids = read_params.record_rowids;
     }
 
     // build heap if key column iterator or build vertical merge iterator if 
value column
diff --git a/be/test/olap/segcompaction_mow_test.cpp 
b/be/test/olap/segcompaction_mow_test.cpp
new file mode 100644
index 00000000000..743b0c2a62e
--- /dev/null
+++ b/be/test/olap/segcompaction_mow_test.cpp
@@ -0,0 +1,907 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include "common/config.h"
+#include "gen_cpp/AgentService_types.h"
+#include "gen_cpp/olap_file.pb.h"
+#include "io/fs/local_file_system.h"
+#include "olap/data_dir.h"
+#include "olap/row_cursor.h"
+#include "olap/rowset/beta_rowset_reader.h"
+#include "olap/rowset/beta_rowset_writer.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_reader_context.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/rowset_writer_context.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_meta.h"
+#include "olap/tablet_schema.h"
+#include "olap/utils.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker.h"
+#include "util/slice.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+static const uint32_t MAX_PATH_LEN = 1024;
+static const uint32_t TABLET_ID = 12345;
+static StorageEngine* s_engine;
+static const std::string lTestDir = "./data_test/data/segcompaction_mow_test";
+
+class SegCompactionMoWTest : public ::testing::TestWithParam<std::string> {
+public:
+    SegCompactionMoWTest() = default;
+
+    void SetUp() {
+        config::enable_segcompaction = true;
+        config::tablet_map_shard_size = 1;
+        config::txn_map_shard_size = 1;
+        config::txn_shard_size = 1;
+
+        char buffer[MAX_PATH_LEN];
+        EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
+        config::storage_root_path = std::string(buffer) + "/data_test";
+
+        auto st = 
io::global_local_filesystem()->delete_directory(config::storage_root_path);
+        ASSERT_TRUE(st.ok()) << st;
+        st = 
io::global_local_filesystem()->create_directory(config::storage_root_path);
+        ASSERT_TRUE(st.ok()) << st;
+
+        std::vector<StorePath> paths;
+        paths.emplace_back(config::storage_root_path, -1);
+
+        doris::EngineOptions options;
+        options.store_paths = paths;
+        Status s = doris::StorageEngine::open(options, &s_engine);
+        EXPECT_TRUE(s.ok()) << s.to_string();
+
+        ExecEnv* exec_env = doris::ExecEnv::GetInstance();
+        exec_env->set_storage_engine(s_engine);
+        _data_dir = new DataDir(lTestDir, 1000000000);
+        static_cast<void>(_data_dir->init());
+        static_cast<void>(_data_dir->update_capacity());
+
+        
EXPECT_TRUE(io::global_local_filesystem()->create_directory(lTestDir).ok());
+
+        s = s_engine->start_bg_threads();
+        EXPECT_TRUE(s.ok()) << s.to_string();
+    }
+
+    void TearDown() {
+        SAFE_DELETE(_data_dir);
+        
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(lTestDir).ok());
+        if (s_engine != nullptr) {
+            s_engine->stop();
+            delete s_engine;
+            s_engine = nullptr;
+            ExecEnv::GetInstance()->set_storage_engine(nullptr);
+        }
+        config::enable_segcompaction = false;
+    }
+
+protected:
+    OlapReaderStatistics _stats;
+
+    bool check_dir(std::vector<std::string>& vec) {
+        std::vector<std::string> result;
+        for (const auto& entry : 
std::filesystem::directory_iterator(lTestDir)) {
+            result.push_back(std::filesystem::path(entry.path()).filename());
+        }
+
+        LOG(INFO) << "expected ls:" << std::endl;
+        for (auto& i : vec) {
+            LOG(INFO) << i;
+        }
+        LOG(INFO) << "acutal ls:" << std::endl;
+        for (auto& i : result) {
+            LOG(INFO) << i;
+        }
+
+        if (result.size() != vec.size()) {
+            return false;
+        } else {
+            for (auto& i : vec) {
+                if (std::find(result.begin(), result.end(), i) == 
result.end()) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    // (k1 int, k2 varchar(20), k3 int) keys (k1, k2)
+    void create_tablet_schema(TabletSchemaSPtr tablet_schema) {
+        TabletSchemaPB tablet_schema_pb;
+        tablet_schema_pb.set_keys_type(UNIQUE_KEYS);
+        tablet_schema_pb.set_num_short_key_columns(2);
+        tablet_schema_pb.set_num_rows_per_row_block(1024);
+        tablet_schema_pb.set_compress_kind(COMPRESS_NONE);
+        tablet_schema_pb.set_next_column_unique_id(5);
+        // add seq column so that segcompaction will process delete bitmap
+        tablet_schema_pb.set_sequence_col_idx(3);
+
+        ColumnPB* column_1 = tablet_schema_pb.add_column();
+        column_1->set_unique_id(1);
+        column_1->set_name("k1");
+        column_1->set_type("INT");
+        column_1->set_is_key(true);
+        column_1->set_length(4);
+        column_1->set_index_length(4);
+        column_1->set_is_nullable(true);
+        column_1->set_is_bf_column(false);
+
+        ColumnPB* column_2 = tablet_schema_pb.add_column();
+        column_2->set_unique_id(2);
+        column_2->set_name("k2");
+        column_2->set_type(
+                "INT"); // TODO change to varchar(20) when dict encoding for 
string is supported
+        column_2->set_length(4);
+        column_2->set_index_length(4);
+        column_2->set_is_nullable(true);
+        column_2->set_is_key(true);
+        column_2->set_is_nullable(true);
+        column_2->set_is_bf_column(false);
+
+        ColumnPB* v_column = tablet_schema_pb.add_column();
+        v_column->set_unique_id(3);
+        v_column->set_name(fmt::format("v1"));
+        v_column->set_type("INT");
+        v_column->set_length(4);
+        v_column->set_is_key(false);
+        v_column->set_is_nullable(false);
+        v_column->set_is_bf_column(false);
+        v_column->set_default_value(std::to_string(10));
+        v_column->set_aggregation("NONE");
+
+        ColumnPB* seq_column = tablet_schema_pb.add_column();
+        seq_column->set_unique_id(4);
+        seq_column->set_name(SEQUENCE_COL);
+        seq_column->set_type("INT");
+        seq_column->set_length(4);
+        seq_column->set_is_key(false);
+        seq_column->set_is_nullable(false);
+        seq_column->set_is_bf_column(false);
+        seq_column->set_default_value(std::to_string(10));
+        seq_column->set_aggregation("NONE");
+
+        tablet_schema->init_from_pb(tablet_schema_pb);
+    }
+
+    // use different id to avoid conflict
+    void create_rowset_writer_context(int64_t id, TabletSchemaSPtr 
tablet_schema,
+                                      RowsetWriterContext* 
rowset_writer_context) {
+        RowsetId rowset_id;
+        rowset_id.init(id);
+        // rowset_writer_context->data_dir = _data_dir.get();
+        rowset_writer_context->rowset_id = rowset_id;
+        rowset_writer_context->tablet_id = TABLET_ID;
+        rowset_writer_context->tablet_schema_hash = 1111;
+        rowset_writer_context->partition_id = 10;
+        rowset_writer_context->rowset_type = BETA_ROWSET;
+        rowset_writer_context->rowset_dir = lTestDir;
+        rowset_writer_context->rowset_state = VISIBLE;
+        rowset_writer_context->tablet_schema = tablet_schema;
+        rowset_writer_context->version.first = 10;
+        rowset_writer_context->version.second = 10;
+
+        TabletMetaSharedPtr tablet_meta = std::make_shared<TabletMeta>();
+        tablet_meta->_tablet_id = TABLET_ID;
+        static_cast<void>(tablet_meta->set_partition_id(10000));
+        tablet_meta->_schema = tablet_schema;
+        tablet_meta->_enable_unique_key_merge_on_write = true;
+        auto tablet = std::make_shared<Tablet>(tablet_meta, _data_dir, 
"test_str");
+        tablet->init();
+        // tablet->key
+        rowset_writer_context->tablet = tablet;
+    }
+
+    void create_and_init_rowset_reader(Rowset* rowset, RowsetReaderContext& 
context,
+                                       RowsetReaderSharedPtr* result) {
+        auto s = rowset->create_reader(result);
+        EXPECT_EQ(Status::OK(), s);
+        EXPECT_TRUE(*result != nullptr);
+
+        s = (*result)->init(&context);
+        EXPECT_EQ(Status::OK(), s);
+    }
+
+    bool check_data_read_with_delete_bitmap(TabletSchemaSPtr tablet_schema,
+                                            DeleteBitmapPtr delete_bitmap, 
RowsetSharedPtr rowset,
+                                            int expect_total_rows, int 
rows_mark_deleted,
+                                            bool skip_value_check = false) {
+        RowsetReaderContext reader_context;
+        reader_context.tablet_schema = tablet_schema;
+        // use this type to avoid cache from other ut
+        reader_context.reader_type = ReaderType::READER_QUERY;
+        reader_context.need_ordered_result = true;
+        std::vector<uint32_t> return_columns = {0, 1, 2};
+        reader_context.return_columns = &return_columns;
+        reader_context.stats = &_stats;
+        reader_context.delete_bitmap = delete_bitmap.get();
+
+        std::vector<uint32_t> segment_num_rows;
+        Status s;
+
+        // without predicates
+        {
+            RowsetReaderSharedPtr rowset_reader;
+            create_and_init_rowset_reader(rowset.get(), reader_context, 
&rowset_reader);
+
+            uint32_t num_rows_read = 0;
+            bool eof = false;
+            while (!eof) {
+                std::shared_ptr<vectorized::Block> output_block =
+                        std::make_shared<vectorized::Block>(
+                                tablet_schema->create_block(return_columns));
+                s = rowset_reader->next_block(output_block.get());
+                if (s != Status::OK()) {
+                    eof = true;
+                }
+                EXPECT_EQ(return_columns.size(), output_block->columns());
+                for (int i = 0; i < output_block->rows(); ++i) {
+                    vectorized::ColumnPtr col0 = 
output_block->get_by_position(0).column;
+                    vectorized::ColumnPtr col1 = 
output_block->get_by_position(1).column;
+                    vectorized::ColumnPtr col2 = 
output_block->get_by_position(2).column;
+                    auto field1 = (*col0)[i];
+                    auto field2 = (*col1)[i];
+                    auto field3 = (*col2)[i];
+                    uint32_t k1 = 
*reinterpret_cast<uint32_t*>((char*)(&field1));
+                    uint32_t k2 = 
*reinterpret_cast<uint32_t*>((char*)(&field2));
+                    uint32_t v3 = 
*reinterpret_cast<uint32_t*>((char*)(&field3));
+                    EXPECT_EQ(100 * v3 + k2, k1);
+                    if (!skip_value_check) {
+                        // all v3%3==0 is deleted in all segments with an even 
number of ids.
+                        EXPECT_TRUE(k2 % 2 != 0 || v3 % 3 != 0);
+                    }
+                    num_rows_read++;
+                }
+                output_block->clear();
+            }
+            EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
+            EXPECT_EQ(rowset->rowset_meta()->num_rows(), expect_total_rows);
+            EXPECT_EQ(num_rows_read, expect_total_rows - rows_mark_deleted);
+            
EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok());
+            size_t total_num_rows = 0;
+            for (const auto& i : segment_num_rows) {
+                total_num_rows += i;
+            }
+            EXPECT_EQ(total_num_rows, expect_total_rows);
+        }
+        return true;
+    }
+
+private:
+    DataDir* _data_dir = nullptr;
+};
+
+TEST_P(SegCompactionMoWTest, SegCompactionThenRead) {
+    std::string delete_ratio = GetParam();
+    config::enable_segcompaction = true;
+    Status s;
+    TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+    create_tablet_schema(tablet_schema);
+
+    RowsetSharedPtr rowset;
+    const int num_segments = 15;
+    const uint32_t rows_per_segment = 4096;
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+                                                     // rows_per_segment
+    config::segcompaction_batch_size = 10;
+    std::vector<uint32_t> segment_num_rows;
+    DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
+    uint32_t rows_mark_deleted = 0;
+    { // write `num_segments * rows_per_segment` rows to rowset
+        RowsetWriterContext writer_context;
+        int raw_rsid = rand();
+        create_rowset_writer_context(raw_rsid, tablet_schema, &writer_context);
+        RowsetIdUnorderedSet rsids;
+        std::vector<RowsetSharedPtr> rowset_ptrs;
+        writer_context.mow_context =
+                std::make_shared<MowContext>(1, 1, rsids, rowset_ptrs, 
delete_bitmap);
+        auto rowset_id = writer_context.rowset_id;
+
+        std::unique_ptr<RowsetWriter> rowset_writer;
+        auto res = RowsetFactory::create_rowset_writer(writer_context, false, 
&rowset_writer);
+        EXPECT_TRUE(s.ok());
+        // for segment "i", row "rid"
+        // k1 := rid*10 + i
+        // k2 := k1 * 10
+        // k3 := rid
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + i;
+                uint32_t k2 = i;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                if (delete_ratio == "full") { // delete all data
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, i, DeleteBitmap::TEMP_VERSION_COMMON}, 
rid);
+                    rows_mark_deleted++;
+                } else {
+                    // mark delete every 3 rows, for segments that seg_id is 
even number
+                    if (i % 2 == 0 && rid % 3 == 0) {
+                        writer_context.mow_context->delete_bitmap->add(
+                                {rowset_id, i, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                        rows_mark_deleted++;
+                    }
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            sleep(1);
+        }
+
+        size_t total_cardinality1 = 0;
+        for (auto entry : delete_bitmap->delete_bitmap) {
+            total_cardinality1 += entry.second.cardinality();
+        }
+        if (delete_ratio == "full") {
+            EXPECT_EQ(num_segments, delete_bitmap->delete_bitmap.size());
+        } else {
+            EXPECT_EQ(num_segments / 2 + num_segments % 2, 
delete_bitmap->delete_bitmap.size());
+        }
+        EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
+        std::vector<std::string> ls;
+        ls.push_back(fmt::format("{}_0.dat", raw_rsid));
+        ls.push_back(fmt::format("{}_1.dat", raw_rsid));
+        ls.push_back(fmt::format("{}_2.dat", raw_rsid));
+        ls.push_back(fmt::format("{}_3.dat", raw_rsid));
+        ls.push_back(fmt::format("{}_4.dat", raw_rsid));
+        ls.push_back(fmt::format("{}_5.dat", raw_rsid));
+        ls.push_back(fmt::format("{}_6.dat", raw_rsid));
+        EXPECT_TRUE(check_dir(ls));
+        // 7 segments plus 1 sentinel mark
+        size_t total_cardinality2 = 0;
+        for (auto entry : delete_bitmap->delete_bitmap) {
+            if (std::get<1>(entry.first) == DeleteBitmap::INVALID_SEGMENT_ID) {
+                continue;
+            }
+            total_cardinality2 += entry.second.cardinality();
+        }
+        if (delete_ratio == "full") {
+            // 7 segments + 1 sentinel mark
+            EXPECT_EQ(8, delete_bitmap->delete_bitmap.size());
+        } else {
+            EXPECT_EQ(5, delete_bitmap->delete_bitmap.size());
+        }
+        EXPECT_EQ(total_cardinality1, total_cardinality2);
+    }
+
+    EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, 
delete_bitmap, rowset,
+                                                   num_segments * 
rows_per_segment,
+                                                   rows_mark_deleted));
+}
+
+TEST_F(SegCompactionMoWTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) 
{
+    config::enable_segcompaction = true;
+    Status s;
+    TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+    create_tablet_schema(tablet_schema);
+
+    RowsetSharedPtr rowset;
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+                                                     // rows_per_segment
+    DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
+    uint32_t rows_mark_deleted = 0;
+    uint32_t total_written_rows = 0;
+    std::vector<uint32_t> segment_num_rows;
+    { // write `num_segments * rows_per_segment` rows to rowset
+        RowsetWriterContext writer_context;
+        create_rowset_writer_context(20048, tablet_schema, &writer_context);
+        RowsetIdUnorderedSet rsids;
+        std::vector<RowsetSharedPtr> rowset_ptrs;
+        writer_context.mow_context =
+                std::make_shared<MowContext>(1, 1, rsids, rowset_ptrs, 
delete_bitmap);
+        auto rowset_id = writer_context.rowset_id;
+
+        std::unique_ptr<RowsetWriter> rowset_writer;
+        auto res = RowsetFactory::create_rowset_writer(writer_context, false, 
&rowset_writer);
+        EXPECT_TRUE(s.ok());
+
+        // for segment "i", row "rid"
+        // k1 := rid*10 + i
+        // k2 := k1 * 10
+        // k3 := 4096 * i + rid
+        int num_segments = 4;
+        uint32_t rows_per_segment = 4096;
+        int segid = 0;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + segid;
+                uint32_t k2 = segid;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows, for segments that seg_id is even 
number
+                if (segid % 2 == 0 && rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+        num_segments = 2;
+        rows_per_segment = 6400;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + segid;
+                uint32_t k2 = segid;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows, for segments that seg_id is even 
number
+                if (segid % 2 == 0 && rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+        num_segments = 1;
+        rows_per_segment = 4096;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + segid;
+                uint32_t k2 = segid;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows, for segments that seg_id is even 
number
+                if (segid % 2 == 0 && rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+        num_segments = 1;
+        rows_per_segment = 6400;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + segid;
+                uint32_t k2 = segid;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows, for segments that seg_id is even 
number
+                if (segid % 2 == 0 && rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+        num_segments = 8;
+        rows_per_segment = 4096;
+        std::map<uint32_t, uint32_t> unique_keys;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                // generate some duplicate rows, segment compaction will merge 
them
+                int rand_i = rand() % (num_segments - 3);
+                uint32_t k1 = rid * 100 + rand_i;
+                uint32_t k2 = rand_i;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows
+                if (rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                }
+                unique_keys.emplace(k1, rid);
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            sleep(1);
+            segid++;
+        }
+        // these 8 segments should be compacted to 1 segment finally
+        // so the finally written rows should be the unique rows after 
compaction
+        total_written_rows += unique_keys.size();
+        for (auto entry : unique_keys) {
+            if (entry.second % 3 == 0) {
+                rows_mark_deleted++;
+            }
+        }
+
+        num_segments = 1;
+        rows_per_segment = 6400;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + segid;
+                uint32_t k2 = segid;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows, for segments that seg_id is even 
number
+                if (segid % 2 == 0 && rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            sleep(1);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+
+        EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
+        std::vector<std::string> ls;
+        // ooooOOoOooooooooO
+        ls.push_back("20048_0.dat"); // oooo
+        ls.push_back("20048_1.dat"); // O
+        ls.push_back("20048_2.dat"); // O
+        ls.push_back("20048_3.dat"); // o
+        ls.push_back("20048_4.dat"); // O
+        ls.push_back("20048_5.dat"); // oooooooo
+        ls.push_back("20048_6.dat"); // O
+        EXPECT_TRUE(check_dir(ls));
+        EXPECT_EQ(6, delete_bitmap->delete_bitmap.size());
+    }
+    EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, 
delete_bitmap, rowset,
+                                                   total_written_rows, 
rows_mark_deleted, true));
+}
+
+TEST_F(SegCompactionMoWTest, SegCompactionInterleaveWithBig_OoOoO) {
+    config::enable_segcompaction = true;
+    Status s;
+    TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+    create_tablet_schema(tablet_schema);
+
+    RowsetSharedPtr rowset;
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+    config::segcompaction_batch_size = 5;
+    std::vector<uint32_t> segment_num_rows;
+    DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
+    uint32_t rows_mark_deleted = 0;
+    uint32_t total_written_rows = 0;
+    { // write `num_segments * rows_per_segment` rows to rowset
+        RowsetWriterContext writer_context;
+        create_rowset_writer_context(20049, tablet_schema, &writer_context);
+        RowsetIdUnorderedSet rsids;
+        std::vector<RowsetSharedPtr> rowset_ptrs;
+        writer_context.mow_context =
+                std::make_shared<MowContext>(1, 1, rsids, rowset_ptrs, 
delete_bitmap);
+        auto rowset_id = writer_context.rowset_id;
+
+        std::unique_ptr<RowsetWriter> rowset_writer;
+        auto res = RowsetFactory::create_rowset_writer(writer_context, false, 
&rowset_writer);
+        EXPECT_TRUE(s.ok());
+
+        // for segment "i", row "rid"
+        // k1 := rid*10 + i
+        // k2 := k1 * 10
+        // k3 := 4096 * i + rid
+        int num_segments = 1;
+        uint32_t rows_per_segment = 6400;
+        int segid = 0;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + segid;
+                uint32_t k2 = segid;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows, for segments that seg_id is even 
number
+                if (segid % 2 == 0 && rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+        num_segments = 1;
+        rows_per_segment = 4096;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + segid;
+                uint32_t k2 = segid;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows, for segments that seg_id is even 
number
+                if (segid % 2 == 0 && rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+        num_segments = 1;
+        rows_per_segment = 6400;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + segid;
+                uint32_t k2 = segid;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows, for segments that seg_id is even 
number
+                if (segid % 2 == 0 && rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+        num_segments = 1;
+        rows_per_segment = 4096;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + segid;
+                uint32_t k2 = segid;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows, for segments that seg_id is even 
number
+                if (segid % 2 == 0 && rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+        num_segments = 1;
+        rows_per_segment = 6400;
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + segid;
+                uint32_t k2 = segid;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows, for segments that seg_id is even 
number
+                if (segid % 2 == 0 && rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, segid, 
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            sleep(1);
+            segid++;
+            total_written_rows += rows_per_segment;
+        }
+
+        EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
+        std::vector<std::string> ls;
+        ls.push_back("20049_0.dat"); // O
+        ls.push_back("20049_1.dat"); // o
+        ls.push_back("20049_2.dat"); // O
+        ls.push_back("20049_3.dat"); // o
+        ls.push_back("20049_4.dat"); // O
+        EXPECT_TRUE(check_dir(ls));
+    }
+
+    EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, 
delete_bitmap, rowset,
+                                                   total_written_rows, 
rows_mark_deleted));
+}
+
+TEST_F(SegCompactionMoWTest, SegCompactionNotTrigger) {
+    config::enable_segcompaction = true;
+    Status s;
+    TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+    create_tablet_schema(tablet_schema);
+
+    RowsetSharedPtr rowset;
+    const int num_segments = 8;
+    const uint32_t rows_per_segment = 4096;
+    config::segcompaction_candidate_max_rows = 6000; // set threshold above
+                                                     // rows_per_segment
+    config::segcompaction_batch_size = 10;
+    std::vector<uint32_t> segment_num_rows;
+    DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
+    uint32_t rows_mark_deleted = 0;
+    { // write `num_segments * rows_per_segment` rows to rowset
+        RowsetWriterContext writer_context;
+        create_rowset_writer_context(20050, tablet_schema, &writer_context);
+        RowsetIdUnorderedSet rsids;
+        std::vector<RowsetSharedPtr> rowset_ptrs;
+        writer_context.mow_context =
+                std::make_shared<MowContext>(1, 1, rsids, rowset_ptrs, 
delete_bitmap);
+        auto rowset_id = writer_context.rowset_id;
+
+        std::unique_ptr<RowsetWriter> rowset_writer;
+        auto res = RowsetFactory::create_rowset_writer(writer_context, false, 
&rowset_writer);
+        EXPECT_TRUE(s.ok());
+
+        // for segment "i", row "rid"
+        // k1 := rid*10 + i
+        // k2 := k1 * 10
+        // k3 := rid
+        for (int i = 0; i < num_segments; ++i) {
+            vectorized::Block block = tablet_schema->create_block();
+            auto columns = block.mutate_columns();
+            for (int rid = 0; rid < rows_per_segment; ++rid) {
+                uint32_t k1 = rid * 100 + i;
+                uint32_t k2 = i;
+                uint32_t k3 = rid;
+                uint32_t seq = 0;
+                columns[0]->insert_data((const char*)&k1, sizeof(k1));
+                columns[1]->insert_data((const char*)&k2, sizeof(k2));
+                columns[2]->insert_data((const char*)&k3, sizeof(k3));
+                columns[3]->insert_data((const char*)&seq, sizeof(seq));
+                // mark delete every 3 rows, for segments that seg_id is even 
number
+                if (i % 2 == 0 && rid % 3 == 0) {
+                    writer_context.mow_context->delete_bitmap->add(
+                            {rowset_id, i, DeleteBitmap::TEMP_VERSION_COMMON}, 
rid);
+                    rows_mark_deleted++;
+                }
+            }
+            s = rowset_writer->add_block(&block);
+            EXPECT_TRUE(s.ok());
+            s = rowset_writer->flush();
+            EXPECT_EQ(Status::OK(), s);
+            sleep(1);
+        }
+
+        EXPECT_EQ(num_segments / 2 + num_segments % 2, 
delete_bitmap->delete_bitmap.size());
+        EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
+        std::vector<std::string> ls;
+        ls.push_back("20050_0.dat");
+        ls.push_back("20050_1.dat");
+        ls.push_back("20050_2.dat");
+        ls.push_back("20050_3.dat");
+        ls.push_back("20050_4.dat");
+        ls.push_back("20050_5.dat");
+        ls.push_back("20050_6.dat");
+        ls.push_back("20050_7.dat");
+        EXPECT_TRUE(check_dir(ls));
+        EXPECT_EQ(num_segments / 2 + num_segments % 2, 
delete_bitmap->delete_bitmap.size());
+
+        
EXPECT_FALSE(static_cast<BetaRowsetWriter*>(rowset_writer.get())->is_segcompacted());
+    }
+
+    EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, 
delete_bitmap, rowset,
+                                                   num_segments * 
rows_per_segment,
+                                                   rows_mark_deleted));
+}
+
+INSTANTIATE_TEST_SUITE_P(Params, SegCompactionMoWTest,
+                         ::testing::ValuesIn(std::vector<std::string> 
{"partial", "full"}));
+
+} // namespace doris
+
+// @brief Test Stub


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to