This is an automated email from the ASF dual-hosted git repository. zhangchen pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new c31344667ca [fix](merge-on-write) segcompaction should process delete bitmap if necessary (#38369) (#39749) c31344667ca is described below commit c31344667caf98b1960df64ec5450f36f7ea4884 Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com> AuthorDate: Thu Aug 22 14:22:54 2024 +0800 [fix](merge-on-write) segcompaction should process delete bitmap if necessary (#38369) (#39749) cherry-pick #38369 and #38800 --- be/src/olap/merger.cpp | 19 +- be/src/olap/merger.h | 6 +- be/src/olap/rowset/beta_rowset_writer.cpp | 35 +- be/src/olap/rowset/beta_rowset_writer.h | 3 +- be/src/olap/rowset/segcompaction.cpp | 72 ++- be/src/olap/rowset/segcompaction.h | 13 + be/src/olap/simple_rowid_conversion.h | 84 +++ be/src/vec/olap/vertical_block_reader.cpp | 1 + be/test/olap/segcompaction_mow_test.cpp | 907 ++++++++++++++++++++++++++++++ 9 files changed, 1123 insertions(+), 17 deletions(-) diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index 6bcdb2206c5..164b38dcb7d 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -283,16 +283,12 @@ Status Merger::vertical_compact_one_group( } // for segcompaction -Status Merger::vertical_compact_one_group(TabletSharedPtr tablet, ReaderType reader_type, - TabletSchemaSPtr tablet_schema, bool is_key, - const std::vector<uint32_t>& column_group, - vectorized::RowSourcesBuffer* row_source_buf, - vectorized::VerticalBlockReader& src_block_reader, - segment_v2::SegmentWriter& dst_segment_writer, - int64_t max_rows_per_segment, Statistics* stats_output, - uint64_t* index_size, KeyBoundsPB& key_bounds) { - // build tablet reader - VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << max_rows_per_segment; +Status Merger::vertical_compact_one_group( + TabletSharedPtr tablet, ReaderType reader_type, TabletSchemaSPtr tablet_schema, bool is_key, + const std::vector<uint32_t>& column_group, vectorized::RowSourcesBuffer* row_source_buf, + vectorized::VerticalBlockReader& src_block_reader, + segment_v2::SegmentWriter& dst_segment_writer, Statistics* stats_output, + uint64_t* index_size, KeyBoundsPB& key_bounds, SimpleRowIdConversion* rowid_conversion) { // TODO: record_rowids vectorized::Block block = tablet_schema->create_block(column_group); size_t output_rows = 0; @@ -309,6 +305,9 @@ Status Merger::vertical_compact_one_group(TabletSharedPtr tablet, ReaderType rea dst_segment_writer.append_block(&block, 0, block.rows()), "failed to write block when merging rowsets of tablet " + tablet->full_name()); + if (is_key && rowid_conversion != nullptr) { + rowid_conversion->add(src_block_reader.current_block_row_locations()); + } output_rows += block.rows(); block.clear_column_data(); } diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h index 37291c548c3..6221dabc8b2 100644 --- a/be/src/olap/merger.h +++ b/be/src/olap/merger.h @@ -24,6 +24,7 @@ #include "common/status.h" #include "io/io_common.h" #include "olap/rowset/rowset_reader.h" +#include "olap/simple_rowid_conversion.h" #include "olap/tablet.h" #include "olap/tablet_schema.h" @@ -84,8 +85,9 @@ public: vectorized::RowSourcesBuffer* row_source_buf, vectorized::VerticalBlockReader& src_block_reader, segment_v2::SegmentWriter& dst_segment_writer, - int64_t max_rows_per_segment, Statistics* stats_output, - uint64_t* index_size, KeyBoundsPB& key_bounds); + Statistics* stats_output, uint64_t* index_size, + KeyBoundsPB& key_bounds, + SimpleRowIdConversion* rowid_conversion); }; } // namespace doris diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 57e45b6f83b..31e3e628365 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -191,7 +191,12 @@ Status BetaRowsetWriter::_find_longest_consecutive_small_segment( if (is_large_segment) { if (segid == _segcompacted_point) { // skip large segments at the front + auto dst_seg_id = _num_segcompacted.load(); RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); + if (_segcompaction_worker.need_convert_delete_bitmap()) { + _segcompaction_worker.convert_segment_delete_bitmap( + _context.mow_context->delete_bitmap, segid, dst_seg_id); + } continue; } else { // stop because we need consecutive segments @@ -216,7 +221,13 @@ Status BetaRowsetWriter::_find_longest_consecutive_small_segment( } if (s == 1) { // poor bachelor, let it go VLOG_DEBUG << "only one candidate segment"; + auto src_seg_id = _segcompacted_point.load(); + auto dst_seg_id = _num_segcompacted.load(); RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); + if (_segcompaction_worker.need_convert_delete_bitmap()) { + _segcompaction_worker.convert_segment_delete_bitmap(_context.mow_context->delete_bitmap, + src_seg_id, dst_seg_id); + } segments->clear(); return Status::OK(); } @@ -374,7 +385,7 @@ Status BetaRowsetWriter::_segcompaction_rename_last_segments() { return Status::Error<SEGCOMPACTION_FAILED>( "BetaRowsetWriter::_segcompaction_rename_last_segments meet invalid state"); } - if (!_is_segcompacted() || _segcompacted_point == _num_segment) { + if (!is_segcompacted() || _segcompacted_point == _num_segment) { // no need if never segcompact before or all segcompacted return Status::OK(); } @@ -382,7 +393,12 @@ Status BetaRowsetWriter::_segcompaction_rename_last_segments() { // so that transaction can be committed ASAP VLOG_DEBUG << "segcompaction last few segments"; for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) { + auto dst_segid = _num_segcompacted.load(); RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); + if (_segcompaction_worker.need_convert_delete_bitmap()) { + _segcompaction_worker.convert_segment_delete_bitmap(_context.mow_context->delete_bitmap, + segid, dst_segid); + } } return Status::OK(); } @@ -543,6 +559,21 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) { RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_worker.get_file_writer()->close(), "close segment compaction worker failed"); } + // process delete bitmap for mow table + if (is_segcompacted() && _segcompaction_worker.need_convert_delete_bitmap()) { + auto converted_delete_bitmap = _segcompaction_worker.get_converted_delete_bitmap(); + // which means the segment compaction is triggerd + if (converted_delete_bitmap != nullptr) { + RowsetIdUnorderedSet rowsetids; + rowsetids.insert(rowset_id()); + auto tablet = static_cast<Tablet*>(_context.tablet.get()); + tablet->add_sentinel_mark_to_delete_bitmap(converted_delete_bitmap.get(), + rowsetids); + _context.mow_context->delete_bitmap->remove({rowset_id(), 0, 0}, + {rowset_id(), UINT32_MAX, INT64_MAX}); + _context.mow_context->delete_bitmap->merge(*converted_delete_bitmap); + } + } } // When building a rowset, we must ensure that the current _segment_writer has been // flushed, that is, the current _segment_writer is nullptr @@ -612,7 +643,7 @@ void BetaRowsetWriter::_build_rowset_meta_with_spec_field( Status BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta, bool check_segment_num) { - int64_t num_seg = _is_segcompacted() ? _num_segcompacted : _num_segment; + int64_t num_seg = is_segcompacted() ? _num_segcompacted : _num_segment; int64_t num_rows_written = 0; int64_t total_data_size = 0; int64_t total_index_size = 0; diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 38f9eb7923d..90135b2f0fc 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -134,6 +134,8 @@ public: return _context.partial_update_info && _context.partial_update_info->is_partial_update; } + bool is_segcompacted() { return (_num_segcompacted > 0) ? true : false; } + private: Status _do_add_block(const vectorized::Block* block, std::unique_ptr<segment_v2::SegmentWriter>* segment_writer, @@ -155,7 +157,6 @@ private: Status _segcompaction_rename_last_segments(); Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, int32_t segment_id); Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr& segments); - bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; } bool _check_and_set_is_doing_segcompaction(); diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index 598a78326ed..d134f87e8b9 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -75,11 +75,14 @@ Status SegcompactionWorker::_get_segcompaction_reader( std::vector<uint32_t>& return_columns, std::unique_ptr<vectorized::VerticalBlockReader>* reader) { auto ctx = _writer->_context; + bool record_rowids = need_convert_delete_bitmap() && is_key; StorageReadOptions read_options; read_options.stats = stat; read_options.use_page_cache = false; read_options.tablet_schema = ctx.tablet_schema; + read_options.record_rowids = record_rowids; std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; + std::map<uint32_t, uint32_t> segment_rows; for (auto& seg_ptr : *segments) { std::unique_ptr<RowwiseIterator> iter; auto s = seg_ptr->new_iterator(schema, read_options, &iter); @@ -88,6 +91,10 @@ Status SegcompactionWorker::_get_segcompaction_reader( s.to_string()); } seg_iterators.push_back(std::move(iter)); + segment_rows.emplace(seg_ptr->id(), seg_ptr->num_rows()); + } + if (record_rowids && _rowid_conversion != nullptr) { + _rowid_conversion->reset_segment_map(segment_rows); } *reader = std::unique_ptr<vectorized::VerticalBlockReader> { @@ -102,6 +109,7 @@ Status SegcompactionWorker::_get_segcompaction_reader( reader_params.return_columns = return_columns; reader_params.is_key_column_group = is_key; reader_params.use_page_cache = false; + reader_params.record_rowids = record_rowids; return (*reader)->init(reader_params); } @@ -219,6 +227,9 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt DCHECK(ctx.tablet); auto tablet = ctx.tablet; + if (need_convert_delete_bitmap() && _rowid_conversion == nullptr) { + _rowid_conversion = std::make_unique<SimpleRowIdConversion>(_writer->rowset_id()); + } std::vector<std::vector<uint32_t>> column_groups; Merger::vertical_split_columns(ctx.tablet_schema, &column_groups); @@ -248,8 +259,8 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt Merger::Statistics merger_stats; RETURN_IF_ERROR(Merger::vertical_compact_one_group( tablet, ReaderType::READER_SEGMENT_COMPACTION, ctx.tablet_schema, is_key, - column_ids, &row_sources_buf, *reader, *writer, INT_MAX, &merger_stats, &index_size, - key_bounds)); + column_ids, &row_sources_buf, *reader, *writer, &merger_stats, &index_size, + key_bounds, _rowid_conversion.get())); total_index_size += index_size; if (is_key) { row_sources_buf.flush(); @@ -275,6 +286,10 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt } RETURN_IF_ERROR(_delete_original_segments(begin, end)); + if (_rowid_conversion != nullptr) { + convert_segment_delete_bitmap(ctx.mow_context->delete_bitmap, begin, end, + _writer->_num_segcompacted); + } RETURN_IF_ERROR(_writer->_rename_compacted_segments(begin, end)); if (VLOG_DEBUG_IS_ON) { @@ -332,4 +347,57 @@ void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segm } } +bool SegcompactionWorker::need_convert_delete_bitmap() { + if (_writer == nullptr) { + return false; + } + auto tablet = _writer->_context.tablet; + return tablet != nullptr && tablet->keys_type() == KeysType::UNIQUE_KEYS && + tablet->enable_unique_key_merge_on_write() && + tablet->tablet_schema()->has_sequence_col(); +} + +void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, + uint32_t src_seg_id, uint32_t dest_seg_id) { + // lazy init + if (nullptr == _converted_delete_bitmap) { + _converted_delete_bitmap = std::make_shared<DeleteBitmap>(_writer->_context.tablet_id); + } + auto rowset_id = _writer->_context.rowset_id; + const auto* seg_map = + src_delete_bitmap->get({rowset_id, src_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}); + if (seg_map != nullptr) { + _converted_delete_bitmap->set({rowset_id, dest_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}, + *seg_map); + } +} + +void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, + uint32_t src_begin, uint32_t src_end, + uint32_t dst_seg_id) { + // lazy init + if (nullptr == _converted_delete_bitmap) { + _converted_delete_bitmap = std::make_shared<DeleteBitmap>(_writer->_context.tablet_id); + } + auto rowset_id = _writer->_context.rowset_id; + RowLocation src(rowset_id, 0, 0); + for (uint32_t seg_id = src_begin; seg_id <= src_end; seg_id++) { + const auto* seg_map = + src_delete_bitmap->get({rowset_id, seg_id, DeleteBitmap::TEMP_VERSION_COMMON}); + if (!seg_map) { + continue; + } + src.segment_id = seg_id; + for (unsigned int row_id : *seg_map) { + src.row_id = row_id; + auto dst_row_id = _rowid_conversion->get(src); + if (dst_row_id < 0) { + continue; + } + _converted_delete_bitmap->add( + {rowset_id, dst_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}, dst_row_id); + } + } +} + } // namespace doris diff --git a/be/src/olap/rowset/segcompaction.h b/be/src/olap/rowset/segcompaction.h index 273fbdec560..5f832f664e4 100644 --- a/be/src/olap/rowset/segcompaction.h +++ b/be/src/olap/rowset/segcompaction.h @@ -24,6 +24,7 @@ #include "common/status.h" #include "io/fs/file_reader_writer_fwd.h" #include "olap/merger.h" +#include "olap/simple_rowid_conversion.h" #include "olap/tablet.h" #include "segment_v2/segment.h" @@ -52,6 +53,14 @@ public: void compact_segments(SegCompactionCandidatesSharedPtr segments); + bool need_convert_delete_bitmap(); + + void convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, uint32_t src_seg_id, + uint32_t dest_seg_id); + void convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, uint32_t src_begin, + uint32_t src_end, uint32_t dest_seg_id); + DeleteBitmapPtr get_converted_delete_bitmap() { return _converted_delete_bitmap; } + io::FileWriterPtr& get_file_writer() { return _file_writer; } // set the cancel flag, tasks already started will not be cancelled. @@ -78,5 +87,9 @@ private: BetaRowsetWriter* _writer; io::FileWriterPtr _file_writer; std::atomic<bool> _cancelled = false; + + // for unique key mow table + std::unique_ptr<SimpleRowIdConversion> _rowid_conversion; + DeleteBitmapPtr _converted_delete_bitmap; }; } // namespace doris diff --git a/be/src/olap/simple_rowid_conversion.h b/be/src/olap/simple_rowid_conversion.h new file mode 100644 index 00000000000..1a89b01838f --- /dev/null +++ b/be/src/olap/simple_rowid_conversion.h @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <map> +#include <vector> + +#include "olap/olap_common.h" +#include "olap/utils.h" + +namespace doris { + +// Simple verion of rowid conversion, for segcompaction +// convert rows from several segments to rows in 1 segment +class SimpleRowIdConversion { +public: + SimpleRowIdConversion(const RowsetId& rowset_id) : _rowst_id(rowset_id) {}; + ~SimpleRowIdConversion() = default; + + // resize segment rowid map to its rows num + void reset_segment_map(const std::map<uint32_t, uint32_t>& num_rows) { + _cur_dst_segment_rowid = 0; + for (auto seg_rows : num_rows) { + _segments_rowid_map.emplace(seg_rows.first, + std::vector<uint32_t>(seg_rows.second, UINT32_MAX)); + } + } + + // add row id to the map + void add(const std::vector<RowLocation>& rss_row_ids) { + for (auto& item : rss_row_ids) { + if (item.row_id == -1) { + continue; + } + DCHECK(_segments_rowid_map.find(item.segment_id) != _segments_rowid_map.end() && + _segments_rowid_map[item.segment_id].size() > item.row_id); + _segments_rowid_map[item.segment_id][item.row_id] = _cur_dst_segment_rowid++; + } + } + + // get destination RowLocation + // return non-zero if the src RowLocation does not exist + int get(const RowLocation& src) const { + auto it = _segments_rowid_map.find(src.segment_id); + if (it == _segments_rowid_map.end()) { + return -1; + } + const auto& rowid_map = it->second; + if (src.row_id >= rowid_map.size() || UINT32_MAX == rowid_map[src.row_id]) { + return -1; + } + + return rowid_map[src.row_id]; + } + +private: + // key: index indicates src segment. + // value: index indicates row id of source segment, value indicates row id of destination + // segment. UINT32_MAX indicates current row not exist. + std::map<uint32_t, std::vector<uint32_t>> _segments_rowid_map; + + // dst rowset id + RowsetId _rowst_id; + + // current rowid of dst segment + std::uint32_t _cur_dst_segment_rowid = 0; +}; + +} // namespace doris diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp index f1f4f3a3895..ee2f52462f8 100644 --- a/be/src/vec/olap/vertical_block_reader.cpp +++ b/be/src/vec/olap/vertical_block_reader.cpp @@ -120,6 +120,7 @@ Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params) _reader_context.need_ordered_result = true; // TODO: should it be? _reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS; _reader_context.is_key_column_group = read_params.is_key_column_group; + _reader_context.record_rowids = read_params.record_rowids; } // build heap if key column iterator or build vertical merge iterator if value column diff --git a/be/test/olap/segcompaction_mow_test.cpp b/be/test/olap/segcompaction_mow_test.cpp new file mode 100644 index 00000000000..743b0c2a62e --- /dev/null +++ b/be/test/olap/segcompaction_mow_test.cpp @@ -0,0 +1,907 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include <memory> +#include <sstream> +#include <string> +#include <vector> + +#include "common/config.h" +#include "gen_cpp/AgentService_types.h" +#include "gen_cpp/olap_file.pb.h" +#include "io/fs/local_file_system.h" +#include "olap/data_dir.h" +#include "olap/row_cursor.h" +#include "olap/rowset/beta_rowset_reader.h" +#include "olap/rowset/beta_rowset_writer.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/rowset/rowset_reader_context.h" +#include "olap/rowset/rowset_writer.h" +#include "olap/rowset/rowset_writer_context.h" +#include "olap/storage_engine.h" +#include "olap/tablet_meta.h" +#include "olap/tablet_schema.h" +#include "olap/utils.h" +#include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker.h" +#include "util/slice.h" + +namespace doris { +using namespace ErrorCode; + +static const uint32_t MAX_PATH_LEN = 1024; +static const uint32_t TABLET_ID = 12345; +static StorageEngine* s_engine; +static const std::string lTestDir = "./data_test/data/segcompaction_mow_test"; + +class SegCompactionMoWTest : public ::testing::TestWithParam<std::string> { +public: + SegCompactionMoWTest() = default; + + void SetUp() { + config::enable_segcompaction = true; + config::tablet_map_shard_size = 1; + config::txn_map_shard_size = 1; + config::txn_shard_size = 1; + + char buffer[MAX_PATH_LEN]; + EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr); + config::storage_root_path = std::string(buffer) + "/data_test"; + + auto st = io::global_local_filesystem()->delete_directory(config::storage_root_path); + ASSERT_TRUE(st.ok()) << st; + st = io::global_local_filesystem()->create_directory(config::storage_root_path); + ASSERT_TRUE(st.ok()) << st; + + std::vector<StorePath> paths; + paths.emplace_back(config::storage_root_path, -1); + + doris::EngineOptions options; + options.store_paths = paths; + Status s = doris::StorageEngine::open(options, &s_engine); + EXPECT_TRUE(s.ok()) << s.to_string(); + + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + exec_env->set_storage_engine(s_engine); + _data_dir = new DataDir(lTestDir, 1000000000); + static_cast<void>(_data_dir->init()); + static_cast<void>(_data_dir->update_capacity()); + + EXPECT_TRUE(io::global_local_filesystem()->create_directory(lTestDir).ok()); + + s = s_engine->start_bg_threads(); + EXPECT_TRUE(s.ok()) << s.to_string(); + } + + void TearDown() { + SAFE_DELETE(_data_dir); + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(lTestDir).ok()); + if (s_engine != nullptr) { + s_engine->stop(); + delete s_engine; + s_engine = nullptr; + ExecEnv::GetInstance()->set_storage_engine(nullptr); + } + config::enable_segcompaction = false; + } + +protected: + OlapReaderStatistics _stats; + + bool check_dir(std::vector<std::string>& vec) { + std::vector<std::string> result; + for (const auto& entry : std::filesystem::directory_iterator(lTestDir)) { + result.push_back(std::filesystem::path(entry.path()).filename()); + } + + LOG(INFO) << "expected ls:" << std::endl; + for (auto& i : vec) { + LOG(INFO) << i; + } + LOG(INFO) << "acutal ls:" << std::endl; + for (auto& i : result) { + LOG(INFO) << i; + } + + if (result.size() != vec.size()) { + return false; + } else { + for (auto& i : vec) { + if (std::find(result.begin(), result.end(), i) == result.end()) { + return false; + } + } + } + return true; + } + + // (k1 int, k2 varchar(20), k3 int) keys (k1, k2) + void create_tablet_schema(TabletSchemaSPtr tablet_schema) { + TabletSchemaPB tablet_schema_pb; + tablet_schema_pb.set_keys_type(UNIQUE_KEYS); + tablet_schema_pb.set_num_short_key_columns(2); + tablet_schema_pb.set_num_rows_per_row_block(1024); + tablet_schema_pb.set_compress_kind(COMPRESS_NONE); + tablet_schema_pb.set_next_column_unique_id(5); + // add seq column so that segcompaction will process delete bitmap + tablet_schema_pb.set_sequence_col_idx(3); + + ColumnPB* column_1 = tablet_schema_pb.add_column(); + column_1->set_unique_id(1); + column_1->set_name("k1"); + column_1->set_type("INT"); + column_1->set_is_key(true); + column_1->set_length(4); + column_1->set_index_length(4); + column_1->set_is_nullable(true); + column_1->set_is_bf_column(false); + + ColumnPB* column_2 = tablet_schema_pb.add_column(); + column_2->set_unique_id(2); + column_2->set_name("k2"); + column_2->set_type( + "INT"); // TODO change to varchar(20) when dict encoding for string is supported + column_2->set_length(4); + column_2->set_index_length(4); + column_2->set_is_nullable(true); + column_2->set_is_key(true); + column_2->set_is_nullable(true); + column_2->set_is_bf_column(false); + + ColumnPB* v_column = tablet_schema_pb.add_column(); + v_column->set_unique_id(3); + v_column->set_name(fmt::format("v1")); + v_column->set_type("INT"); + v_column->set_length(4); + v_column->set_is_key(false); + v_column->set_is_nullable(false); + v_column->set_is_bf_column(false); + v_column->set_default_value(std::to_string(10)); + v_column->set_aggregation("NONE"); + + ColumnPB* seq_column = tablet_schema_pb.add_column(); + seq_column->set_unique_id(4); + seq_column->set_name(SEQUENCE_COL); + seq_column->set_type("INT"); + seq_column->set_length(4); + seq_column->set_is_key(false); + seq_column->set_is_nullable(false); + seq_column->set_is_bf_column(false); + seq_column->set_default_value(std::to_string(10)); + seq_column->set_aggregation("NONE"); + + tablet_schema->init_from_pb(tablet_schema_pb); + } + + // use different id to avoid conflict + void create_rowset_writer_context(int64_t id, TabletSchemaSPtr tablet_schema, + RowsetWriterContext* rowset_writer_context) { + RowsetId rowset_id; + rowset_id.init(id); + // rowset_writer_context->data_dir = _data_dir.get(); + rowset_writer_context->rowset_id = rowset_id; + rowset_writer_context->tablet_id = TABLET_ID; + rowset_writer_context->tablet_schema_hash = 1111; + rowset_writer_context->partition_id = 10; + rowset_writer_context->rowset_type = BETA_ROWSET; + rowset_writer_context->rowset_dir = lTestDir; + rowset_writer_context->rowset_state = VISIBLE; + rowset_writer_context->tablet_schema = tablet_schema; + rowset_writer_context->version.first = 10; + rowset_writer_context->version.second = 10; + + TabletMetaSharedPtr tablet_meta = std::make_shared<TabletMeta>(); + tablet_meta->_tablet_id = TABLET_ID; + static_cast<void>(tablet_meta->set_partition_id(10000)); + tablet_meta->_schema = tablet_schema; + tablet_meta->_enable_unique_key_merge_on_write = true; + auto tablet = std::make_shared<Tablet>(tablet_meta, _data_dir, "test_str"); + tablet->init(); + // tablet->key + rowset_writer_context->tablet = tablet; + } + + void create_and_init_rowset_reader(Rowset* rowset, RowsetReaderContext& context, + RowsetReaderSharedPtr* result) { + auto s = rowset->create_reader(result); + EXPECT_EQ(Status::OK(), s); + EXPECT_TRUE(*result != nullptr); + + s = (*result)->init(&context); + EXPECT_EQ(Status::OK(), s); + } + + bool check_data_read_with_delete_bitmap(TabletSchemaSPtr tablet_schema, + DeleteBitmapPtr delete_bitmap, RowsetSharedPtr rowset, + int expect_total_rows, int rows_mark_deleted, + bool skip_value_check = false) { + RowsetReaderContext reader_context; + reader_context.tablet_schema = tablet_schema; + // use this type to avoid cache from other ut + reader_context.reader_type = ReaderType::READER_QUERY; + reader_context.need_ordered_result = true; + std::vector<uint32_t> return_columns = {0, 1, 2}; + reader_context.return_columns = &return_columns; + reader_context.stats = &_stats; + reader_context.delete_bitmap = delete_bitmap.get(); + + std::vector<uint32_t> segment_num_rows; + Status s; + + // without predicates + { + RowsetReaderSharedPtr rowset_reader; + create_and_init_rowset_reader(rowset.get(), reader_context, &rowset_reader); + + uint32_t num_rows_read = 0; + bool eof = false; + while (!eof) { + std::shared_ptr<vectorized::Block> output_block = + std::make_shared<vectorized::Block>( + tablet_schema->create_block(return_columns)); + s = rowset_reader->next_block(output_block.get()); + if (s != Status::OK()) { + eof = true; + } + EXPECT_EQ(return_columns.size(), output_block->columns()); + for (int i = 0; i < output_block->rows(); ++i) { + vectorized::ColumnPtr col0 = output_block->get_by_position(0).column; + vectorized::ColumnPtr col1 = output_block->get_by_position(1).column; + vectorized::ColumnPtr col2 = output_block->get_by_position(2).column; + auto field1 = (*col0)[i]; + auto field2 = (*col1)[i]; + auto field3 = (*col2)[i]; + uint32_t k1 = *reinterpret_cast<uint32_t*>((char*)(&field1)); + uint32_t k2 = *reinterpret_cast<uint32_t*>((char*)(&field2)); + uint32_t v3 = *reinterpret_cast<uint32_t*>((char*)(&field3)); + EXPECT_EQ(100 * v3 + k2, k1); + if (!skip_value_check) { + // all v3%3==0 is deleted in all segments with an even number of ids. + EXPECT_TRUE(k2 % 2 != 0 || v3 % 3 != 0); + } + num_rows_read++; + } + output_block->clear(); + } + EXPECT_EQ(Status::Error<END_OF_FILE>(""), s); + EXPECT_EQ(rowset->rowset_meta()->num_rows(), expect_total_rows); + EXPECT_EQ(num_rows_read, expect_total_rows - rows_mark_deleted); + EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok()); + size_t total_num_rows = 0; + for (const auto& i : segment_num_rows) { + total_num_rows += i; + } + EXPECT_EQ(total_num_rows, expect_total_rows); + } + return true; + } + +private: + DataDir* _data_dir = nullptr; +}; + +TEST_P(SegCompactionMoWTest, SegCompactionThenRead) { + std::string delete_ratio = GetParam(); + config::enable_segcompaction = true; + Status s; + TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>(); + create_tablet_schema(tablet_schema); + + RowsetSharedPtr rowset; + const int num_segments = 15; + const uint32_t rows_per_segment = 4096; + config::segcompaction_candidate_max_rows = 6000; // set threshold above + // rows_per_segment + config::segcompaction_batch_size = 10; + std::vector<uint32_t> segment_num_rows; + DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID); + uint32_t rows_mark_deleted = 0; + { // write `num_segments * rows_per_segment` rows to rowset + RowsetWriterContext writer_context; + int raw_rsid = rand(); + create_rowset_writer_context(raw_rsid, tablet_schema, &writer_context); + RowsetIdUnorderedSet rsids; + std::vector<RowsetSharedPtr> rowset_ptrs; + writer_context.mow_context = + std::make_shared<MowContext>(1, 1, rsids, rowset_ptrs, delete_bitmap); + auto rowset_id = writer_context.rowset_id; + + std::unique_ptr<RowsetWriter> rowset_writer; + auto res = RowsetFactory::create_rowset_writer(writer_context, false, &rowset_writer); + EXPECT_TRUE(s.ok()); + // for segment "i", row "rid" + // k1 := rid*10 + i + // k2 := k1 * 10 + // k3 := rid + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + if (delete_ratio == "full") { // delete all data + writer_context.mow_context->delete_bitmap->add( + {rowset_id, i, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } else { + // mark delete every 3 rows, for segments that seg_id is even number + if (i % 2 == 0 && rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, i, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + sleep(1); + } + + size_t total_cardinality1 = 0; + for (auto entry : delete_bitmap->delete_bitmap) { + total_cardinality1 += entry.second.cardinality(); + } + if (delete_ratio == "full") { + EXPECT_EQ(num_segments, delete_bitmap->delete_bitmap.size()); + } else { + EXPECT_EQ(num_segments / 2 + num_segments % 2, delete_bitmap->delete_bitmap.size()); + } + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); + std::vector<std::string> ls; + ls.push_back(fmt::format("{}_0.dat", raw_rsid)); + ls.push_back(fmt::format("{}_1.dat", raw_rsid)); + ls.push_back(fmt::format("{}_2.dat", raw_rsid)); + ls.push_back(fmt::format("{}_3.dat", raw_rsid)); + ls.push_back(fmt::format("{}_4.dat", raw_rsid)); + ls.push_back(fmt::format("{}_5.dat", raw_rsid)); + ls.push_back(fmt::format("{}_6.dat", raw_rsid)); + EXPECT_TRUE(check_dir(ls)); + // 7 segments plus 1 sentinel mark + size_t total_cardinality2 = 0; + for (auto entry : delete_bitmap->delete_bitmap) { + if (std::get<1>(entry.first) == DeleteBitmap::INVALID_SEGMENT_ID) { + continue; + } + total_cardinality2 += entry.second.cardinality(); + } + if (delete_ratio == "full") { + // 7 segments + 1 sentinel mark + EXPECT_EQ(8, delete_bitmap->delete_bitmap.size()); + } else { + EXPECT_EQ(5, delete_bitmap->delete_bitmap.size()); + } + EXPECT_EQ(total_cardinality1, total_cardinality2); + } + + EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, delete_bitmap, rowset, + num_segments * rows_per_segment, + rows_mark_deleted)); +} + +TEST_F(SegCompactionMoWTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) { + config::enable_segcompaction = true; + Status s; + TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>(); + create_tablet_schema(tablet_schema); + + RowsetSharedPtr rowset; + config::segcompaction_candidate_max_rows = 6000; // set threshold above + // rows_per_segment + DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID); + uint32_t rows_mark_deleted = 0; + uint32_t total_written_rows = 0; + std::vector<uint32_t> segment_num_rows; + { // write `num_segments * rows_per_segment` rows to rowset + RowsetWriterContext writer_context; + create_rowset_writer_context(20048, tablet_schema, &writer_context); + RowsetIdUnorderedSet rsids; + std::vector<RowsetSharedPtr> rowset_ptrs; + writer_context.mow_context = + std::make_shared<MowContext>(1, 1, rsids, rowset_ptrs, delete_bitmap); + auto rowset_id = writer_context.rowset_id; + + std::unique_ptr<RowsetWriter> rowset_writer; + auto res = RowsetFactory::create_rowset_writer(writer_context, false, &rowset_writer); + EXPECT_TRUE(s.ok()); + + // for segment "i", row "rid" + // k1 := rid*10 + i + // k2 := k1 * 10 + // k3 := 4096 * i + rid + int num_segments = 4; + uint32_t rows_per_segment = 4096; + int segid = 0; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + segid; + uint32_t k2 = segid; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows, for segments that seg_id is even number + if (segid % 2 == 0 && rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 2; + rows_per_segment = 6400; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + segid; + uint32_t k2 = segid; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows, for segments that seg_id is even number + if (segid % 2 == 0 && rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 1; + rows_per_segment = 4096; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + segid; + uint32_t k2 = segid; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows, for segments that seg_id is even number + if (segid % 2 == 0 && rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 1; + rows_per_segment = 6400; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + segid; + uint32_t k2 = segid; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows, for segments that seg_id is even number + if (segid % 2 == 0 && rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 8; + rows_per_segment = 4096; + std::map<uint32_t, uint32_t> unique_keys; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + // generate some duplicate rows, segment compaction will merge them + int rand_i = rand() % (num_segments - 3); + uint32_t k1 = rid * 100 + rand_i; + uint32_t k2 = rand_i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + } + unique_keys.emplace(k1, rid); + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + sleep(1); + segid++; + } + // these 8 segments should be compacted to 1 segment finally + // so the finally written rows should be the unique rows after compaction + total_written_rows += unique_keys.size(); + for (auto entry : unique_keys) { + if (entry.second % 3 == 0) { + rows_mark_deleted++; + } + } + + num_segments = 1; + rows_per_segment = 6400; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + segid; + uint32_t k2 = segid; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows, for segments that seg_id is even number + if (segid % 2 == 0 && rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + sleep(1); + segid++; + total_written_rows += rows_per_segment; + } + + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); + std::vector<std::string> ls; + // ooooOOoOooooooooO + ls.push_back("20048_0.dat"); // oooo + ls.push_back("20048_1.dat"); // O + ls.push_back("20048_2.dat"); // O + ls.push_back("20048_3.dat"); // o + ls.push_back("20048_4.dat"); // O + ls.push_back("20048_5.dat"); // oooooooo + ls.push_back("20048_6.dat"); // O + EXPECT_TRUE(check_dir(ls)); + EXPECT_EQ(6, delete_bitmap->delete_bitmap.size()); + } + EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, delete_bitmap, rowset, + total_written_rows, rows_mark_deleted, true)); +} + +TEST_F(SegCompactionMoWTest, SegCompactionInterleaveWithBig_OoOoO) { + config::enable_segcompaction = true; + Status s; + TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>(); + create_tablet_schema(tablet_schema); + + RowsetSharedPtr rowset; + config::segcompaction_candidate_max_rows = 6000; // set threshold above + config::segcompaction_batch_size = 5; + std::vector<uint32_t> segment_num_rows; + DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID); + uint32_t rows_mark_deleted = 0; + uint32_t total_written_rows = 0; + { // write `num_segments * rows_per_segment` rows to rowset + RowsetWriterContext writer_context; + create_rowset_writer_context(20049, tablet_schema, &writer_context); + RowsetIdUnorderedSet rsids; + std::vector<RowsetSharedPtr> rowset_ptrs; + writer_context.mow_context = + std::make_shared<MowContext>(1, 1, rsids, rowset_ptrs, delete_bitmap); + auto rowset_id = writer_context.rowset_id; + + std::unique_ptr<RowsetWriter> rowset_writer; + auto res = RowsetFactory::create_rowset_writer(writer_context, false, &rowset_writer); + EXPECT_TRUE(s.ok()); + + // for segment "i", row "rid" + // k1 := rid*10 + i + // k2 := k1 * 10 + // k3 := 4096 * i + rid + int num_segments = 1; + uint32_t rows_per_segment = 6400; + int segid = 0; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + segid; + uint32_t k2 = segid; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows, for segments that seg_id is even number + if (segid % 2 == 0 && rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 1; + rows_per_segment = 4096; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + segid; + uint32_t k2 = segid; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows, for segments that seg_id is even number + if (segid % 2 == 0 && rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 1; + rows_per_segment = 6400; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + segid; + uint32_t k2 = segid; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows, for segments that seg_id is even number + if (segid % 2 == 0 && rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 1; + rows_per_segment = 4096; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + segid; + uint32_t k2 = segid; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows, for segments that seg_id is even number + if (segid % 2 == 0 && rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 1; + rows_per_segment = 6400; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + segid; + uint32_t k2 = segid; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows, for segments that seg_id is even number + if (segid % 2 == 0 && rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + sleep(1); + segid++; + total_written_rows += rows_per_segment; + } + + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); + std::vector<std::string> ls; + ls.push_back("20049_0.dat"); // O + ls.push_back("20049_1.dat"); // o + ls.push_back("20049_2.dat"); // O + ls.push_back("20049_3.dat"); // o + ls.push_back("20049_4.dat"); // O + EXPECT_TRUE(check_dir(ls)); + } + + EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, delete_bitmap, rowset, + total_written_rows, rows_mark_deleted)); +} + +TEST_F(SegCompactionMoWTest, SegCompactionNotTrigger) { + config::enable_segcompaction = true; + Status s; + TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>(); + create_tablet_schema(tablet_schema); + + RowsetSharedPtr rowset; + const int num_segments = 8; + const uint32_t rows_per_segment = 4096; + config::segcompaction_candidate_max_rows = 6000; // set threshold above + // rows_per_segment + config::segcompaction_batch_size = 10; + std::vector<uint32_t> segment_num_rows; + DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID); + uint32_t rows_mark_deleted = 0; + { // write `num_segments * rows_per_segment` rows to rowset + RowsetWriterContext writer_context; + create_rowset_writer_context(20050, tablet_schema, &writer_context); + RowsetIdUnorderedSet rsids; + std::vector<RowsetSharedPtr> rowset_ptrs; + writer_context.mow_context = + std::make_shared<MowContext>(1, 1, rsids, rowset_ptrs, delete_bitmap); + auto rowset_id = writer_context.rowset_id; + + std::unique_ptr<RowsetWriter> rowset_writer; + auto res = RowsetFactory::create_rowset_writer(writer_context, false, &rowset_writer); + EXPECT_TRUE(s.ok()); + + // for segment "i", row "rid" + // k1 := rid*10 + i + // k2 := k1 * 10 + // k3 := rid + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows, for segments that seg_id is even number + if (i % 2 == 0 && rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, i, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + sleep(1); + } + + EXPECT_EQ(num_segments / 2 + num_segments % 2, delete_bitmap->delete_bitmap.size()); + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); + std::vector<std::string> ls; + ls.push_back("20050_0.dat"); + ls.push_back("20050_1.dat"); + ls.push_back("20050_2.dat"); + ls.push_back("20050_3.dat"); + ls.push_back("20050_4.dat"); + ls.push_back("20050_5.dat"); + ls.push_back("20050_6.dat"); + ls.push_back("20050_7.dat"); + EXPECT_TRUE(check_dir(ls)); + EXPECT_EQ(num_segments / 2 + num_segments % 2, delete_bitmap->delete_bitmap.size()); + + EXPECT_FALSE(static_cast<BetaRowsetWriter*>(rowset_writer.get())->is_segcompacted()); + } + + EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, delete_bitmap, rowset, + num_segments * rows_per_segment, + rows_mark_deleted)); +} + +INSTANTIATE_TEST_SUITE_P(Params, SegCompactionMoWTest, + ::testing::ValuesIn(std::vector<std::string> {"partial", "full"})); + +} // namespace doris + +// @brief Test Stub --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org