This is an automated email from the ASF dual-hosted git repository. jianliangqi pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 66975825f6d branch-3.0: [test](index compaction)Add exception cases for index compaction #45056 (#45192) 66975825f6d is described below commit 66975825f6d4cd6aeee20a3d40580c03b7d63c41 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Wed Dec 11 19:18:22 2024 +0800 branch-3.0: [test](index compaction)Add exception cases for index compaction #45056 (#45192) Cherry-picked from #45056 Co-authored-by: qiye <l...@selectdb.com> --- .../compaction/index_compaction_test.cpp | 963 ++++++++++++++------- .../index_compaction_with_deleted_term.cpp | 671 -------------- .../compaction/util/index_compaction_utils.cpp | 621 +++++++++++++ 3 files changed, 1248 insertions(+), 1007 deletions(-) diff --git a/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_test.cpp b/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_test.cpp index 0f1b27fd4fa..264786570e7 100644 --- a/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_test.cpp +++ b/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_test.cpp @@ -15,14 +15,9 @@ // specific language governing permissions and limitations // under the License. -#include <gtest/gtest.h> +#include <gmock/gmock.h> -#include "olap/cumulative_compaction.h" -#include "olap/rowset/beta_rowset_writer.h" -#include "olap/rowset/rowset_factory.h" -#include "olap/rowset/segment_v2/inverted_index/query/query_factory.h" -#include "olap/rowset/segment_v2/inverted_index_file_reader.h" -#include "olap/storage_engine.h" +#include "util/index_compaction_utils.cpp" namespace doris { @@ -31,14 +26,6 @@ using namespace doris::vectorized; constexpr static uint32_t MAX_PATH_LEN = 1024; constexpr static std::string_view dest_dir = "./ut_dir/inverted_index_test"; constexpr static std::string_view tmp_dir = "./ut_dir/tmp"; -static int64_t inc_id = 1000; - -struct DataRow { - int key; - std::string word; - std::string url; - int num; -}; class IndexCompactionTest : public ::testing::Test { protected: @@ -46,8 +33,8 @@ protected: // absolute dir char buffer[MAX_PATH_LEN]; EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr); - _curreent_dir = std::string(buffer); - _absolute_dir = _curreent_dir + std::string(dest_dir); + _current_dir = std::string(buffer); + _absolute_dir = _current_dir + std::string(dest_dir); EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_absolute_dir).ok()); EXPECT_TRUE(io::global_local_filesystem()->create_directory(_absolute_dir).ok()); @@ -57,7 +44,8 @@ protected: std::vector<StorePath> paths; paths.emplace_back(std::string(tmp_dir), 1024000000); auto tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(paths); - EXPECT_TRUE(tmp_file_dirs->init().ok()); + Status st = tmp_file_dirs->init(); + EXPECT_TRUE(st.ok()) << st.to_json(); ExecEnv::GetInstance()->set_tmp_file_dir(std::move(tmp_file_dirs)); // storage engine @@ -73,24 +61,22 @@ protected: schema_pb.set_keys_type(KeysType::DUP_KEYS); schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2); - construct_column(schema_pb.add_column(), schema_pb.add_index(), 10000, "key_index", 0, - "INT", "key"); - construct_column(schema_pb.add_column(), schema_pb.add_index(), 10001, "v1_index", 1, - "STRING", "v1"); - construct_column(schema_pb.add_column(), schema_pb.add_index(), 10002, "v2_index", 2, - "STRING", "v2", true); - construct_column(schema_pb.add_column(), schema_pb.add_index(), 10003, "v3_index", 3, "INT", - "v3"); - - _tablet_schema.reset(new TabletSchema); + IndexCompactionUtils::construct_column(schema_pb.add_column(), schema_pb.add_index(), 10000, + "key_index", 0, "INT", "key"); + IndexCompactionUtils::construct_column(schema_pb.add_column(), schema_pb.add_index(), 10001, + "v1_index", 1, "STRING", "v1"); + IndexCompactionUtils::construct_column(schema_pb.add_column(), schema_pb.add_index(), 10002, + "v2_index", 2, "STRING", "v2", true); + IndexCompactionUtils::construct_column(schema_pb.add_column(), schema_pb.add_index(), 10003, + "v3_index", 3, "INT", "v3"); + _tablet_schema = std::make_shared<TabletSchema>(); _tablet_schema->init_from_pb(schema_pb); // tablet TabletMetaSharedPtr tablet_meta(new TabletMeta(_tablet_schema)); - _tablet.reset(new Tablet(*_engine_ref, tablet_meta, _data_dir.get())); + _tablet = std::make_shared<Tablet>(*_engine_ref, tablet_meta, _data_dir.get()); EXPECT_TRUE(_tablet->init().ok()); - config::inverted_index_compaction_enable = true; } void TearDown() override { EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok()); @@ -100,41 +86,6 @@ protected: ExecEnv::GetInstance()->set_storage_engine(nullptr); } - void construct_column(ColumnPB* column_pb, TabletIndexPB* tablet_index, int64_t index_id, - const std::string& index_name, int32_t col_unique_id, - const std::string& column_type, const std::string& column_name, - bool parser = false) { - column_pb->set_unique_id(col_unique_id); - column_pb->set_name(column_name); - column_pb->set_type(column_type); - column_pb->set_is_key(false); - column_pb->set_is_nullable(true); - tablet_index->set_index_id(index_id); - tablet_index->set_index_name(index_name); - tablet_index->set_index_type(IndexType::INVERTED); - tablet_index->add_col_unique_id(col_unique_id); - if (parser) { - auto* properties = tablet_index->mutable_properties(); - (*properties)[INVERTED_INDEX_PARSER_KEY] = INVERTED_INDEX_PARSER_UNICODE; - } - } - - RowsetWriterContext rowset_writer_context() { - RowsetWriterContext context; - RowsetId rowset_id; - rowset_id.init(inc_id); - context.rowset_id = rowset_id; - context.rowset_type = BETA_ROWSET; - context.data_dir = _data_dir.get(); - context.rowset_state = VISIBLE; - context.tablet_schema = _tablet_schema; - context.tablet_path = _tablet->tablet_path(); - context.version = Version(inc_id, inc_id); - context.max_rows_per_segment = 200; - inc_id++; - return context; - } - IndexCompactionTest() = default; ~IndexCompactionTest() override = default; @@ -144,300 +95,640 @@ private: std::unique_ptr<DataDir> _data_dir = nullptr; TabletSharedPtr _tablet = nullptr; std::string _absolute_dir; - std::string _curreent_dir; + std::string _current_dir; }; -std::vector<DataRow> read_data(const std::string file_name) { - std::ifstream file(file_name); - EXPECT_TRUE(file.is_open()); - - std::string line; - std::vector<DataRow> data; - - while (std::getline(file, line)) { - std::stringstream ss(line); - std::string item; - DataRow row; - EXPECT_TRUE(std::getline(ss, item, ',')); - row.key = std::stoi(item); - EXPECT_TRUE(std::getline(ss, item, ',')); - row.word = item; - EXPECT_TRUE(std::getline(ss, item, ',')); - row.url = item; - EXPECT_TRUE(std::getline(ss, item, ',')); - row.num = std::stoi(item); - data.emplace_back(std::move(row)); - } +TEST_F(IndexCompactionTest, tes_write_index_normally) { + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok()); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok()); + std::string data_file1 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data1.csv"; + std::string data_file2 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data2.csv"; + std::vector<std::string> data_files; + data_files.push_back(data_file1); + data_files.push_back(data_file2); + + std::vector<RowsetSharedPtr> rowsets(data_files.size()); + auto custom_check_build_rowsets = [](const int32_t& size) { EXPECT_EQ(size, 4); }; + IndexCompactionUtils::build_rowsets(_data_dir, _tablet_schema, _tablet, _engine_ref, rowsets, + data_files, custom_check_build_rowsets); + + auto custom_check_index = [](const BaseCompaction& compaction, const RowsetWriterContext& ctx) { + EXPECT_EQ(compaction._cur_tablet_schema->inverted_indexes().size(), 4); + EXPECT_TRUE(ctx.columns_to_do_index_compaction.size() == 2); + EXPECT_TRUE(ctx.columns_to_do_index_compaction.contains(1)); + EXPECT_TRUE(ctx.columns_to_do_index_compaction.contains(2)); + EXPECT_TRUE(compaction._output_rowset->num_segments() == 1); + }; + + RowsetSharedPtr output_rowset_index; + auto st = IndexCompactionUtils::do_compaction(rowsets, _engine_ref, _tablet, true, + output_rowset_index, custom_check_index); + EXPECT_TRUE(st.ok()) << st.to_string(); + + const auto& seg_path = output_rowset_index->segment_path(0); + EXPECT_TRUE(seg_path.has_value()) << seg_path.error(); + auto inverted_index_file_reader_index = IndexCompactionUtils::init_index_file_reader( + output_rowset_index, seg_path.value(), + _tablet_schema->get_inverted_index_storage_format()); + + auto custom_check_normal = [](const BaseCompaction& compaction, + const RowsetWriterContext& ctx) { + EXPECT_EQ(compaction._cur_tablet_schema->inverted_indexes().size(), 4); + EXPECT_TRUE(ctx.columns_to_do_index_compaction.size() == 0); + EXPECT_TRUE(compaction._output_rowset->num_segments() == 1); + }; + + RowsetSharedPtr output_rowset_normal; + st = IndexCompactionUtils::do_compaction(rowsets, _engine_ref, _tablet, false, + output_rowset_normal, custom_check_normal); + EXPECT_TRUE(st.ok()) << st.to_string(); + const auto& seg_path_normal = output_rowset_normal->segment_path(0); + EXPECT_TRUE(seg_path_normal.has_value()) << seg_path_normal.error(); + auto inverted_index_file_reader_normal = IndexCompactionUtils::init_index_file_reader( + output_rowset_normal, seg_path_normal.value(), + _tablet_schema->get_inverted_index_storage_format()); + // check index file terms + auto dir_idx_compaction = inverted_index_file_reader_index->_open(10001, ""); + EXPECT_TRUE(dir_idx_compaction.has_value()) << dir_idx_compaction.error(); + auto dir_normal_compaction = inverted_index_file_reader_normal->_open(10001, ""); + EXPECT_TRUE(dir_normal_compaction.has_value()) << dir_normal_compaction.error(); + std::ostringstream oss; + IndexCompactionUtils::check_terms_stats(dir_idx_compaction->get(), oss); + std::string output = oss.str(); + EXPECT_EQ(output, expected_output); + oss.str(""); + oss.clear(); + IndexCompactionUtils::check_terms_stats(dir_normal_compaction->get(), oss); + output = oss.str(); + EXPECT_EQ(output, expected_output); + + st = IndexCompactionUtils::check_idx_file_correctness(dir_idx_compaction->get(), + dir_normal_compaction->get()); + EXPECT_TRUE(st.ok()) << st.to_string(); - file.close(); - return data; + // check meta and file + std::map<int, QueryData> query_map = { + {0, {{"99", "66", "56", "87", "85", "96", "20000"}, {21, 25, 22, 18, 14, 18, 0}}}, + {3, {{"99", "66", "56", "87", "85", "96", "10000"}, {12, 20, 25, 23, 16, 24, 0}}}, + {1, {{"good", "maybe", "great", "null"}, {197, 191, 194, 0}}}, + {2, {{"musicstream.com", "http", "https", "null"}, {191, 799, 1201, 0}}}}; + IndexCompactionUtils::check_meta_and_file(output_rowset_index, _tablet_schema, query_map); + IndexCompactionUtils::check_meta_and_file(output_rowset_normal, _tablet_schema, query_map); } -bool query_bkd(const TabletIndex* index, - std::shared_ptr<InvertedIndexFileReader>& inverted_index_file_reader, - const std::vector<int>& query_data, const std::vector<int>& query_result) { - const auto& idx_reader = BkdIndexReader::create_shared(index, inverted_index_file_reader); - const auto& index_searcher_builder = std::make_unique<BKDIndexSearcherBuilder>(); - auto dir = inverted_index_file_reader->open(index); - EXPECT_TRUE(dir.has_value()); - auto searcher_result = index_searcher_builder->get_index_searcher(dir.value().release()); - EXPECT_TRUE(searcher_result.has_value()); - auto bkd_searcher = std::get_if<BKDIndexSearcherPtr>(&searcher_result.value()); - EXPECT_TRUE(bkd_searcher != nullptr); - idx_reader->_type_info = get_scalar_type_info((FieldType)(*bkd_searcher)->type); - EXPECT_TRUE(idx_reader->_type_info != nullptr); - idx_reader->_value_key_coder = get_key_coder(idx_reader->_type_info->type()); - - for (int i = 0; i < query_data.size(); i++) { - vectorized::Field param_value = Int32(query_data[i]); - std::unique_ptr<segment_v2::InvertedIndexQueryParamFactory> query_param = nullptr; - EXPECT_TRUE(segment_v2::InvertedIndexQueryParamFactory::create_query_value( - PrimitiveType::TYPE_INT, ¶m_value, query_param) - .ok()); - auto result = std::make_shared<roaring::Roaring>(); - EXPECT_TRUE(idx_reader - ->invoke_bkd_query(query_param->get_value(), - InvertedIndexQueryType::EQUAL_QUERY, *bkd_searcher, - result) - .ok()); - EXPECT_EQ(query_result[i], result->cardinality()) << query_data[i]; - } - return true; +TEST_F(IndexCompactionTest, test_col_unique_ids_empty) { + // clear column unique id in tablet index 10001 and rebuild tablet_schema + TabletSchemaPB schema_pb; + _tablet_schema->to_schema_pb(&schema_pb); + auto* index_pb = schema_pb.mutable_index(1); + index_pb->clear_col_unique_id(); + _tablet_schema->init_from_pb(schema_pb); + + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok()); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok()); + std::string data_file1 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data1.csv"; + std::string data_file2 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data2.csv"; + std::vector<std::string> data_files; + data_files.push_back(data_file1); + data_files.push_back(data_file2); + + std::vector<RowsetSharedPtr> rowsets(data_files.size()); + auto custom_check_build_rowsets = [](const int32_t& size) { EXPECT_EQ(size, 3); }; + IndexCompactionUtils::build_rowsets(_data_dir, _tablet_schema, _tablet, _engine_ref, rowsets, + data_files, custom_check_build_rowsets); + + auto custom_check_index = [](const BaseCompaction& compaction, const RowsetWriterContext& ctx) { + EXPECT_EQ(compaction._cur_tablet_schema->inverted_indexes().size(), 4); + // only index id 10002 will do index compaction + EXPECT_TRUE(ctx.columns_to_do_index_compaction.size() == 1); + EXPECT_TRUE(ctx.columns_to_do_index_compaction.contains(2)); + EXPECT_TRUE(compaction._output_rowset->num_segments() == 1); + }; + + RowsetSharedPtr output_rowset_index; + auto st = IndexCompactionUtils::do_compaction(rowsets, _engine_ref, _tablet, true, + output_rowset_index, custom_check_index); + EXPECT_TRUE(st.ok()) << st.to_string(); + + const auto& seg_path = output_rowset_index->segment_path(0); + EXPECT_TRUE(seg_path.has_value()) << seg_path.error(); + auto inverted_index_file_reader_index = IndexCompactionUtils::init_index_file_reader( + output_rowset_index, seg_path.value(), + _tablet_schema->get_inverted_index_storage_format()); + + // check index file + // index 10001 cannot be found in idx file + auto dir_idx_compaction = inverted_index_file_reader_index->_open(10001, ""); + EXPECT_TRUE(!dir_idx_compaction.has_value()) << dir_idx_compaction.error(); + EXPECT_THAT(dir_idx_compaction.error().to_string(), + testing::HasSubstr("No index with id 10001 found")); } -bool query_string(const TabletIndex* index, - std::shared_ptr<InvertedIndexFileReader>& inverted_index_file_reader, - const std::string& column_name, const std::vector<std::string>& query_data, - const std::vector<int>& query_result) { - const auto& idx_reader = - StringTypeInvertedIndexReader::create_shared(index, inverted_index_file_reader); - const auto& index_searcher_builder = std::make_unique<FulltextIndexSearcherBuilder>(); - auto dir = inverted_index_file_reader->open(index); - EXPECT_TRUE(dir.has_value()); - auto searcher_result = index_searcher_builder->get_index_searcher(dir.value().release()); - EXPECT_TRUE(searcher_result.has_value()); - auto string_searcher = std::get_if<FulltextIndexSearcherPtr>(&searcher_result.value()); - EXPECT_TRUE(string_searcher != nullptr); - std::wstring column_name_ws = StringUtil::string_to_wstring(column_name); - - for (int i = 0; i < query_data.size(); i++) { - TQueryOptions queryOptions; - auto query = QueryFactory::create(InvertedIndexQueryType::EQUAL_QUERY, *string_searcher, - queryOptions, nullptr); - EXPECT_TRUE(query != nullptr); - InvertedIndexQueryInfo query_info; - query_info.field_name = column_name_ws; - query_info.terms.emplace_back(query_data[i]); - query->add(query_info); - auto result = std::make_shared<roaring::Roaring>(); - query->search(*result); - EXPECT_EQ(query_result[i], result->cardinality()) << query_data[i]; - } - return true; +TEST_F(IndexCompactionTest, test_tablet_index_id_not_equal) { + // replace unique id from 2 to 1 in tablet index 10002 and rebuild tablet_schema + TabletSchemaPB schema_pb; + _tablet_schema->to_schema_pb(&schema_pb); + auto* index_pb = schema_pb.mutable_index(2); + index_pb->set_col_unique_id(0, 1); + _tablet_schema->init_from_pb(schema_pb); + + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok()); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok()); + std::string data_file1 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data1.csv"; + std::string data_file2 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data2.csv"; + std::vector<std::string> data_files; + data_files.push_back(data_file1); + data_files.push_back(data_file2); + + std::vector<RowsetSharedPtr> rowsets(data_files.size()); + auto custom_check_build_rowsets = [](const int32_t& size) { EXPECT_EQ(size, 3); }; + IndexCompactionUtils::build_rowsets(_data_dir, _tablet_schema, _tablet, _engine_ref, rowsets, + data_files, custom_check_build_rowsets); + + auto custom_check_index = [](const BaseCompaction& compaction, const RowsetWriterContext& ctx) { + EXPECT_EQ(compaction._cur_tablet_schema->inverted_indexes().size(), 4); + // only index id 10001 will do index compaction + EXPECT_TRUE(ctx.columns_to_do_index_compaction.size() == 1); + EXPECT_TRUE(ctx.columns_to_do_index_compaction.contains(1)); + EXPECT_TRUE(compaction._output_rowset->num_segments() == 1); + }; + + RowsetSharedPtr output_rowset_index; + auto st = IndexCompactionUtils::do_compaction(rowsets, _engine_ref, _tablet, true, + output_rowset_index, custom_check_index); + EXPECT_TRUE(st.ok()) << st.to_string(); + + const auto& seg_path = output_rowset_index->segment_path(0); + EXPECT_TRUE(seg_path.has_value()) << seg_path.error(); + auto inverted_index_file_reader_index = IndexCompactionUtils::init_index_file_reader( + output_rowset_index, seg_path.value(), + _tablet_schema->get_inverted_index_storage_format()); + + // check index file + // index 10002 cannot be found in idx file + auto dir_idx_compaction = inverted_index_file_reader_index->_open(10002, ""); + EXPECT_TRUE(!dir_idx_compaction.has_value()) << dir_idx_compaction.error(); + EXPECT_THAT(dir_idx_compaction.error().to_string(), + testing::HasSubstr("No index with id 10002 found")); } -bool query_fulltext(const TabletIndex* index, - std::shared_ptr<InvertedIndexFileReader>& inverted_index_file_reader, - const std::string& column_name, const std::vector<std::string>& query_data, - const std::vector<int>& query_result) { - const auto& idx_reader = FullTextIndexReader::create_shared(index, inverted_index_file_reader); - const auto& index_searcher_builder = std::make_unique<FulltextIndexSearcherBuilder>(); - auto dir = inverted_index_file_reader->open(index); - EXPECT_TRUE(dir.has_value()); - auto searcher_result = index_searcher_builder->get_index_searcher(dir.value().release()); - EXPECT_TRUE(searcher_result.has_value()); - auto string_searcher = std::get_if<FulltextIndexSearcherPtr>(&searcher_result.value()); - EXPECT_TRUE(string_searcher != nullptr); - std::wstring column_name_ws = StringUtil::string_to_wstring(column_name); - - for (int i = 0; i < query_data.size(); i++) { - TQueryOptions queryOptions; - auto query = QueryFactory::create(InvertedIndexQueryType::MATCH_ANY_QUERY, *string_searcher, - queryOptions, nullptr); - EXPECT_TRUE(query != nullptr); - InvertedIndexQueryInfo query_info; - query_info.field_name = column_name_ws; - query_info.terms.emplace_back(query_data[i]); - query->add(query_info); - auto result = std::make_shared<roaring::Roaring>(); - query->search(*result); - EXPECT_EQ(query_result[i], result->cardinality()) << query_data[i]; - } - return true; +TEST_F(IndexCompactionTest, test_tablet_schema_tablet_index_is_null) { + // set index suffix in tablet index 10001 and rebuild tablet_schema + // simulate the case that index is null, tablet_schema->inverted_index(1) will return nullptr + TabletSchemaPB schema_pb; + _tablet_schema->to_schema_pb(&schema_pb); + auto* index_pb = schema_pb.mutable_index(1); + index_pb->set_index_suffix_name("mock"); + _tablet_schema->init_from_pb(schema_pb); + + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok()); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok()); + std::string data_file1 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data1.csv"; + std::string data_file2 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data2.csv"; + std::vector<std::string> data_files; + data_files.push_back(data_file1); + data_files.push_back(data_file2); + + std::vector<RowsetSharedPtr> rowsets(data_files.size()); + auto custom_check_build_rowsets = [](const int32_t& size) { EXPECT_EQ(size, 3); }; + IndexCompactionUtils::build_rowsets(_data_dir, _tablet_schema, _tablet, _engine_ref, rowsets, + data_files, custom_check_build_rowsets); + + auto custom_check_index = [](const BaseCompaction& compaction, const RowsetWriterContext& ctx) { + EXPECT_EQ(compaction._cur_tablet_schema->inverted_indexes().size(), 4); + // only index id 10002 will do index compaction + EXPECT_TRUE(ctx.columns_to_do_index_compaction.size() == 1); + EXPECT_TRUE(ctx.columns_to_do_index_compaction.contains(2)); + EXPECT_TRUE(compaction._output_rowset->num_segments() == 1); + }; + + RowsetSharedPtr output_rowset_index; + auto st = IndexCompactionUtils::do_compaction(rowsets, _engine_ref, _tablet, true, + output_rowset_index, custom_check_index); + EXPECT_TRUE(st.ok()) << st.to_string(); + + const auto& seg_path = output_rowset_index->segment_path(0); + EXPECT_TRUE(seg_path.has_value()) << seg_path.error(); + auto inverted_index_file_reader_index = IndexCompactionUtils::init_index_file_reader( + output_rowset_index, seg_path.value(), + _tablet_schema->get_inverted_index_storage_format()); + + // check index file + // index 10001 cannot be found in idx file + auto dir_idx_compaction = inverted_index_file_reader_index->_open(10001, ""); + EXPECT_TRUE(!dir_idx_compaction.has_value()) << dir_idx_compaction.error(); + EXPECT_THAT(dir_idx_compaction.error().to_string(), + testing::HasSubstr("No index with id 10001 found")); } -TEST_F(IndexCompactionTest, write_index_test) { +TEST_F(IndexCompactionTest, test_rowset_schema_tablet_index_is_null) { EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok()); EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok()); std::string data_file1 = - _curreent_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data1.csv"; + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data1.csv"; std::string data_file2 = - _curreent_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data2.csv"; - - std::vector<std::vector<DataRow>> data; - data.emplace_back(read_data(data_file1)); - data.emplace_back(read_data(data_file2)); - - std::vector<RowsetSharedPtr> rowsets(data.size()); - for (int i = 0; i < data.size(); i++) { - const auto& res = - RowsetFactory::create_rowset_writer(*_engine_ref, rowset_writer_context(), false); - EXPECT_TRUE(res.has_value()) << res.error(); - const auto& rowset_writer = res.value(); - - Block block = _tablet_schema->create_block(); - auto columns = block.mutate_columns(); - for (const auto& row : data[i]) { - vectorized::Field key = Int32(row.key); - vectorized::Field v1(row.word); - vectorized::Field v2(row.url); - vectorized::Field v3 = Int32(row.num); - columns[0]->insert(key); - columns[1]->insert(v1); - columns[2]->insert(v2); - columns[3]->insert(v3); - } - EXPECT_TRUE(rowset_writer->add_block(&block).ok()); - EXPECT_TRUE(rowset_writer->flush().ok()); - const auto& dst_writer = dynamic_cast<BaseBetaRowsetWriter*>(rowset_writer.get()); - - // inverted index file writer - for (const auto& [seg_id, idx_file_writer] : dst_writer->_idx_files.get_file_writers()) { - EXPECT_TRUE(idx_file_writer->_closed); - } - - EXPECT_TRUE(rowset_writer->build(rowsets[i]).ok()); - EXPECT_TRUE(_tablet->add_rowset(rowsets[i]).ok()); - EXPECT_TRUE(rowsets[i]->num_segments() == 5); - - // check rowset meta and file - for (int seg_id = 0; seg_id < rowsets[i]->num_segments(); seg_id++) { - const auto& index_info = rowsets[i]->_rowset_meta->inverted_index_file_info(seg_id); - EXPECT_TRUE(index_info.has_index_size()); - const auto& fs = rowsets[i]->_rowset_meta->fs(); - const auto& file_name = fmt::format("{}/{}_{}.idx", rowsets[i]->tablet_path(), - rowsets[i]->rowset_id().to_string(), seg_id); - int64_t file_size = 0; - EXPECT_TRUE(fs->file_size(file_name, &file_size).ok()); - EXPECT_EQ(index_info.index_size(), file_size); - - const auto& seg_path = rowsets[i]->segment_path(seg_id); - EXPECT_TRUE(seg_path.has_value()); - const auto& index_file_path_prefix = - InvertedIndexDescriptor::get_index_file_path_prefix(seg_path.value()); - auto inverted_index_file_reader = std::make_shared<InvertedIndexFileReader>( - fs, std::string(index_file_path_prefix), - _tablet_schema->get_inverted_index_storage_format(), index_info); - EXPECT_TRUE(inverted_index_file_reader->init().ok()); - const auto& dirs = inverted_index_file_reader->get_all_directories(); - EXPECT_TRUE(dirs.has_value()); - EXPECT_EQ(dirs.value().size(), 4); - } - } + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data2.csv"; + std::vector<std::string> data_files; + data_files.push_back(data_file1); + data_files.push_back(data_file2); + + std::vector<RowsetSharedPtr> rowsets(data_files.size()); + auto custom_check_build_rowsets = [](const int32_t& size) { EXPECT_EQ(size, 4); }; + IndexCompactionUtils::build_rowsets(_data_dir, _tablet_schema, _tablet, _engine_ref, rowsets, + data_files, custom_check_build_rowsets); + + auto custom_check_index = [](const BaseCompaction& compaction, const RowsetWriterContext& ctx) { + EXPECT_EQ(compaction._cur_tablet_schema->inverted_indexes().size(), 4); + // only index id 10002 will do index compaction + EXPECT_TRUE(ctx.columns_to_do_index_compaction.size() == 1); + EXPECT_TRUE(ctx.columns_to_do_index_compaction.contains(2)); + EXPECT_TRUE(compaction._output_rowset->num_segments() == 1); + }; + + // set index suffix in tablet index 10001 and rebuild tablet_schema + // simulate the case that index is null, tablet_schema->inverted_index(1) will return nullptr + TabletSchemaPB schema_pb; + TabletSchemaSPtr mock_schema = std::make_shared<TabletSchema>(); + _tablet_schema->to_schema_pb(&schema_pb); + auto* index_pb = schema_pb.mutable_index(1); + index_pb->set_index_suffix_name("mock"); + mock_schema->init_from_pb(schema_pb); + rowsets[0]->_schema = mock_schema; + + RowsetSharedPtr output_rowset_index; + auto st = IndexCompactionUtils::do_compaction(rowsets, _engine_ref, _tablet, true, + output_rowset_index, custom_check_index); + EXPECT_TRUE(st.ok()) << st.to_string(); - CumulativeCompaction compaction(*_engine_ref, _tablet); - compaction._input_rowsets = std::move(rowsets); - compaction.build_basic_info(); + const auto& seg_path = output_rowset_index->segment_path(0); + EXPECT_TRUE(seg_path.has_value()) << seg_path.error(); + auto inverted_index_file_reader_index = IndexCompactionUtils::init_index_file_reader( + output_rowset_index, seg_path.value(), + _tablet_schema->get_inverted_index_storage_format()); + + // check index file + // index 10001 cannot be found in idx file + auto dir_idx_compaction = inverted_index_file_reader_index->_open(10001, ""); + EXPECT_TRUE(dir_idx_compaction.has_value()) << dir_idx_compaction.error(); + // check index 10001 term stats + std::ostringstream oss; + IndexCompactionUtils::check_terms_stats(dir_idx_compaction.value().get(), oss); + std::string output = oss.str(); + EXPECT_EQ(output, expected_output); +} - std::vector<RowsetReaderSharedPtr> input_rs_readers; - input_rs_readers.reserve(compaction._input_rowsets.size()); - for (auto& rowset : compaction._input_rowsets) { - RowsetReaderSharedPtr rs_reader; - EXPECT_TRUE(rowset->create_reader(&rs_reader).ok()); - input_rs_readers.push_back(std::move(rs_reader)); - } +TEST_F(IndexCompactionTest, test_tablet_index_properties_not_equal) { + // add mock property in tablet index 10001 and rebuild tablet_schema + // simulate the case that index properties not equal among input rowsets + TabletSchemaSPtr mock_schema = std::make_shared<TabletSchema>(); + TabletSchemaPB schema_pb; + _tablet_schema->to_schema_pb(&schema_pb); + auto* index_pb = schema_pb.mutable_index(1); + (*index_pb->mutable_properties())["mock_key"] = "mock_value"; + mock_schema->init_from_pb(schema_pb); - RowsetWriterContext ctx; - EXPECT_TRUE(compaction.construct_output_rowset_writer(ctx).ok()); - - // col word - EXPECT_TRUE(ctx.columns_to_do_index_compaction.contains(1)); - // col url - EXPECT_TRUE(ctx.columns_to_do_index_compaction.contains(2)); - - compaction._stats.rowid_conversion = compaction._rowid_conversion.get(); - EXPECT_TRUE(Merger::vertical_merge_rowsets(_tablet, compaction.compaction_type(), - *(compaction._cur_tablet_schema), input_rs_readers, - compaction._output_rs_writer.get(), 100000, 5, - &compaction._stats) - .ok()); - const auto& dst_writer = - dynamic_cast<BaseBetaRowsetWriter*>(compaction._output_rs_writer.get()); - for (const auto& [seg_id, idx_file_writer] : dst_writer->_idx_files.get_file_writers()) { - EXPECT_FALSE(idx_file_writer->_closed); - } - auto st = compaction.do_inverted_index_compaction(); + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok()); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok()); + std::string data_file1 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data1.csv"; + std::string data_file2 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data2.csv"; + std::vector<std::string> data_files; + data_files.push_back(data_file1); + data_files.push_back(data_file2); + + std::vector<RowsetSharedPtr> rowsets(data_files.size()); + auto custom_check_build_rowsets = [](const int32_t& size) { EXPECT_EQ(size, 4); }; + IndexCompactionUtils::build_rowsets(_data_dir, _tablet_schema, _tablet, _engine_ref, rowsets, + data_files, custom_check_build_rowsets); + + auto custom_check_index = [](const BaseCompaction& compaction, const RowsetWriterContext& ctx) { + EXPECT_EQ(compaction._cur_tablet_schema->inverted_indexes().size(), 4); + // only index id 10002 will do index compaction + EXPECT_TRUE(ctx.columns_to_do_index_compaction.size() == 1); + EXPECT_TRUE(ctx.columns_to_do_index_compaction.contains(2)); + EXPECT_TRUE(compaction._output_rowset->num_segments() == 1); + }; + + // set mock_schema to the first input rowset + rowsets[0]->_schema = mock_schema; + RowsetSharedPtr output_rowset_index; + auto st = IndexCompactionUtils::do_compaction(rowsets, _engine_ref, _tablet, true, + output_rowset_index, custom_check_index); EXPECT_TRUE(st.ok()) << st.to_string(); - st = compaction._output_rs_writer->build(compaction._output_rowset); + const auto& seg_path = output_rowset_index->segment_path(0); + EXPECT_TRUE(seg_path.has_value()) << seg_path.error(); + auto inverted_index_file_reader_index = IndexCompactionUtils::init_index_file_reader( + output_rowset_index, seg_path.value(), + _tablet_schema->get_inverted_index_storage_format()); + + // check index file + auto dir_idx_compaction = inverted_index_file_reader_index->_open(10001, ""); + EXPECT_TRUE(dir_idx_compaction.has_value()) << dir_idx_compaction.error(); + + // check index 10001 term stats + std::ostringstream oss; + IndexCompactionUtils::check_terms_stats(dir_idx_compaction.value().get(), oss); + std::string output = oss.str(); + EXPECT_EQ(output, expected_output); +} + +TEST_F(IndexCompactionTest, test_is_skip_index_compaction_not_empty) { + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok()); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok()); + std::string data_file1 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data1.csv"; + std::string data_file2 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data2.csv"; + std::vector<std::string> data_files; + data_files.push_back(data_file1); + data_files.push_back(data_file2); + + std::vector<RowsetSharedPtr> rowsets(data_files.size()); + auto custom_check_build_rowsets = [](const int32_t& size) { EXPECT_EQ(size, 4); }; + IndexCompactionUtils::build_rowsets(_data_dir, _tablet_schema, _tablet, _engine_ref, rowsets, + data_files, custom_check_build_rowsets); + + auto custom_check_index = [](const BaseCompaction& compaction, const RowsetWriterContext& ctx) { + EXPECT_EQ(compaction._cur_tablet_schema->inverted_indexes().size(), 4); + // only index id 10002 will do index compaction + EXPECT_TRUE(ctx.columns_to_do_index_compaction.size() == 1); + EXPECT_TRUE(ctx.columns_to_do_index_compaction.contains(2)); + EXPECT_TRUE(compaction._output_rowset->num_segments() == 1); + }; + + // set col_unique_id=1(index_id=10001) to skip index compaction + rowsets[0]->set_skip_index_compaction(1); + RowsetSharedPtr output_rowset_index; + auto st = IndexCompactionUtils::do_compaction(rowsets, _engine_ref, _tablet, true, + output_rowset_index, custom_check_index); EXPECT_TRUE(st.ok()) << st.to_string(); - for (const auto& [seg_id, idx_file_writer] : dst_writer->_idx_files.get_file_writers()) { - EXPECT_TRUE(idx_file_writer->_closed); - } - EXPECT_TRUE(compaction._output_rowset->num_segments() == 1); - - const auto& output_rowset = compaction._output_rowset; - - // check rowset meta and file - for (int seg_id = 0; seg_id < output_rowset->num_segments(); seg_id++) { - // meta - const auto& index_info = output_rowset->_rowset_meta->inverted_index_file_info(seg_id); - EXPECT_TRUE(index_info.has_index_size()); - const auto& fs = output_rowset->_rowset_meta->fs(); - const auto& file_name = fmt::format("{}/{}_{}.idx", output_rowset->tablet_path(), - output_rowset->rowset_id().to_string(), seg_id); - int64_t file_size = 0; - EXPECT_TRUE(fs->file_size(file_name, &file_size).ok()); - EXPECT_EQ(index_info.index_size(), file_size); - - // file - const auto& seg_path = output_rowset->segment_path(seg_id); - EXPECT_TRUE(seg_path.has_value()); - const auto& index_file_path_prefix = - InvertedIndexDescriptor::get_index_file_path_prefix(seg_path.value()); - auto inverted_index_file_reader = std::make_shared<InvertedIndexFileReader>( - fs, std::string(index_file_path_prefix), - _tablet_schema->get_inverted_index_storage_format(), index_info); - EXPECT_TRUE(inverted_index_file_reader->init().ok()); - const auto& dirs = inverted_index_file_reader->get_all_directories(); - EXPECT_TRUE(dirs.has_value()); - EXPECT_EQ(dirs.value().size(), 4); - - // read col key - const auto& key = _tablet_schema->column_by_uid(0); - const auto* key_index = _tablet_schema->inverted_index(key); - EXPECT_TRUE(key_index != nullptr); - std::vector<int> query_data {99, 66, 56, 87, 85, 96, 20000}; - std::vector<int> query_result {21, 25, 22, 18, 14, 18, 0}; - EXPECT_TRUE(query_bkd(key_index, inverted_index_file_reader, query_data, query_result)); - - // read col v3 - const auto& v3_column = _tablet_schema->column_by_uid(3); - const auto* v3_index = _tablet_schema->inverted_index(v3_column); - EXPECT_TRUE(v3_index != nullptr); - std::vector<int> query_data3 {99, 66, 56, 87, 85, 96, 10000}; - std::vector<int> query_result3 {12, 20, 25, 23, 16, 24, 0}; - EXPECT_TRUE(query_bkd(v3_index, inverted_index_file_reader, query_data3, query_result3)); - - // read col v1 - const auto& v1_column = _tablet_schema->column_by_uid(1); - const auto* v1_index = _tablet_schema->inverted_index(v1_column); - EXPECT_TRUE(v1_index != nullptr); - std::vector<std::string> query_data1 {"good", "maybe", "great", "null"}; - std::vector<int> query_result1 {197, 191, 194, 0}; - EXPECT_TRUE(query_string(v1_index, inverted_index_file_reader, "1", query_data1, - query_result1)); - - // read col v2 - const auto& v2_column = _tablet_schema->column_by_uid(2); - const auto* v2_index = _tablet_schema->inverted_index(v2_column); - EXPECT_TRUE(v2_index != nullptr); - std::vector<std::string> query_data2 {"musicstream.com", "http", "https", "null"}; - std::vector<int> query_result2 {191, 799, 1201, 0}; - EXPECT_TRUE(query_fulltext(v2_index, inverted_index_file_reader, "2", query_data2, - query_result2)); + const auto& seg_path = output_rowset_index->segment_path(0); + EXPECT_TRUE(seg_path.has_value()) << seg_path.error(); + auto inverted_index_file_reader_index = IndexCompactionUtils::init_index_file_reader( + output_rowset_index, seg_path.value(), + _tablet_schema->get_inverted_index_storage_format()); + + // check index file + auto dir_idx_compaction = inverted_index_file_reader_index->_open(10001, ""); + EXPECT_TRUE(dir_idx_compaction.has_value()) << dir_idx_compaction.error(); + + // check index 10001 term stats + std::ostringstream oss; + IndexCompactionUtils::check_terms_stats(dir_idx_compaction.value().get(), oss); + std::string output = oss.str(); + EXPECT_EQ(output, expected_output); +} + +TEST_F(IndexCompactionTest, test_rowset_fs_nullptr) { + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok()); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok()); + std::string data_file1 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data1.csv"; + std::string data_file2 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data2.csv"; + std::vector<std::string> data_files; + data_files.push_back(data_file1); + data_files.push_back(data_file2); + + std::vector<RowsetSharedPtr> rowsets(data_files.size()); + auto custom_check_build_rowsets = [](const int32_t& size) { EXPECT_EQ(size, 4); }; + IndexCompactionUtils::build_rowsets(_data_dir, _tablet_schema, _tablet, _engine_ref, rowsets, + data_files, custom_check_build_rowsets); + + auto custom_check_index = [](const BaseCompaction& compaction, const RowsetWriterContext& ctx) { + EXPECT_EQ(compaction._cur_tablet_schema->inverted_indexes().size(), 4); + // only index id 10002 will do index compaction + EXPECT_TRUE(ctx.columns_to_do_index_compaction.size() == 1); + EXPECT_TRUE(ctx.columns_to_do_index_compaction.contains(2)); + EXPECT_TRUE(compaction._output_rowset->num_segments() == 1); + }; + + // set mock_id to resource_id to simulate getting fs nullptr + RowsetMetaSharedPtr mock_rowset_meta = std::make_shared<RowsetMeta>(); + RowsetMetaPB rs_meta_pb; + rowsets[0]->to_rowset_pb(&rs_meta_pb); + rs_meta_pb.set_resource_id("mock_id"); + mock_rowset_meta->init_from_pb(rs_meta_pb); + rowsets[0]->_rowset_meta = mock_rowset_meta; + RowsetSharedPtr output_rowset_index; + auto st = IndexCompactionUtils::do_compaction(rowsets, _engine_ref, _tablet, true, + output_rowset_index, custom_check_index); + EXPECT_TRUE(!st.ok()); + EXPECT_THAT(st.to_string(), testing::HasSubstr("[E-206]get fs failed")); +} + +TEST_F(IndexCompactionTest, test_input_row_num_zero) { + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok()); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok()); + std::string data_file1 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data1.csv"; + std::string data_file2 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data2.csv"; + std::vector<std::string> data_files; + data_files.push_back(data_file1); + data_files.push_back(data_file2); + + std::vector<RowsetSharedPtr> rowsets(data_files.size()); + auto custom_check_build_rowsets = [](const int32_t& size) { EXPECT_EQ(size, 4); }; + IndexCompactionUtils::build_rowsets(_data_dir, _tablet_schema, _tablet, _engine_ref, rowsets, + data_files, custom_check_build_rowsets); + + auto custom_check_index = [](const BaseCompaction& compaction, const RowsetWriterContext& ctx) { + EXPECT_EQ(compaction._cur_tablet_schema->inverted_indexes().size(), 4); + // only index id 10002 will do index compaction + EXPECT_TRUE(ctx.columns_to_do_index_compaction.size() == 2); + EXPECT_TRUE(ctx.columns_to_do_index_compaction.contains(1)); + EXPECT_TRUE(ctx.columns_to_do_index_compaction.contains(2)); + EXPECT_TRUE(compaction._output_rowset->num_segments() == 1); + }; + + // set num_rows to 0 to simulate input_row_num = 0 + for (auto rowset : rowsets) { + RowsetMetaSharedPtr mock_rowset_meta = std::make_shared<RowsetMeta>(); + RowsetMetaPB rs_meta_pb; + rowset->to_rowset_pb(&rs_meta_pb); + rs_meta_pb.set_num_rows(0); + mock_rowset_meta->init_from_pb(rs_meta_pb); + rowset->_rowset_meta = mock_rowset_meta; } + + RowsetSharedPtr output_rowset_index; + auto st = IndexCompactionUtils::do_compaction(rowsets, _engine_ref, _tablet, true, + output_rowset_index, custom_check_index); + EXPECT_TRUE(st.ok()); + const auto& seg_path = output_rowset_index->segment_path(0); + EXPECT_TRUE(seg_path.has_value()) << seg_path.error(); + auto inverted_index_file_reader_index = IndexCompactionUtils::init_index_file_reader( + output_rowset_index, seg_path.value(), + _tablet_schema->get_inverted_index_storage_format()); + + // check index file + // index 10001 cannot be found in idx file + auto dir_idx_compaction = inverted_index_file_reader_index->_open(10001, ""); + EXPECT_TRUE(!dir_idx_compaction.has_value()) << dir_idx_compaction.error(); + EXPECT_THAT(dir_idx_compaction.error().to_string(), + testing::HasSubstr("No index with id 10001 found")); +} + +TEST_F(IndexCompactionTest, test_cols_to_do_index_compaction_empty) { + // add mock property in tablet index 10001, 10002 and rebuild tablet_schema + // simulate the case that index properties not equal among input rowsets + // the two cols will skip index compaction and make ctx.columns_to_do_index_compaction empty + TabletSchemaSPtr mock_schema = std::make_shared<TabletSchema>(); + TabletSchemaPB schema_pb; + _tablet_schema->to_schema_pb(&schema_pb); + auto* index_pb_1 = schema_pb.mutable_index(1); + (*index_pb_1->mutable_properties())["mock_key"] = "mock_value"; + auto* index_pb_2 = schema_pb.mutable_index(2); + (*index_pb_2->mutable_properties())["mock_key"] = "mock_value"; + mock_schema->init_from_pb(schema_pb); + + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok()); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok()); + std::string data_file1 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data1.csv"; + std::string data_file2 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data2.csv"; + std::vector<std::string> data_files; + data_files.push_back(data_file1); + data_files.push_back(data_file2); + + std::vector<RowsetSharedPtr> rowsets(data_files.size()); + auto custom_check_build_rowsets = [](const int32_t& size) { EXPECT_EQ(size, 4); }; + IndexCompactionUtils::build_rowsets(_data_dir, _tablet_schema, _tablet, _engine_ref, rowsets, + data_files, custom_check_build_rowsets); + + auto custom_check_index = [](const BaseCompaction& compaction, const RowsetWriterContext& ctx) { + EXPECT_EQ(compaction._cur_tablet_schema->inverted_indexes().size(), 4); + // none index will do index compaction + EXPECT_TRUE(ctx.columns_to_do_index_compaction.size() == 0); + EXPECT_TRUE(compaction._output_rowset->num_segments() == 1); + }; + + // set mock_schema to the first input rowset + rowsets[0]->_schema = mock_schema; + RowsetSharedPtr output_rowset_index; + auto st = IndexCompactionUtils::do_compaction(rowsets, _engine_ref, _tablet, true, + output_rowset_index, custom_check_index); + EXPECT_TRUE(st.ok()) << st.to_string(); + + const auto& seg_path = output_rowset_index->segment_path(0); + EXPECT_TRUE(seg_path.has_value()) << seg_path.error(); + auto inverted_index_file_reader_index = IndexCompactionUtils::init_index_file_reader( + output_rowset_index, seg_path.value(), + _tablet_schema->get_inverted_index_storage_format()); + + // check index file + auto dir_idx_compaction_1 = inverted_index_file_reader_index->_open(10001, ""); + EXPECT_TRUE(dir_idx_compaction_1.has_value()) << dir_idx_compaction_1.error(); + + // check index 10001 term stats + std::ostringstream oss; + IndexCompactionUtils::check_terms_stats(dir_idx_compaction_1.value().get(), oss); + std::string output = oss.str(); + EXPECT_EQ(output, expected_output); + + auto dir_idx_compaction_2 = inverted_index_file_reader_index->_open(10002, ""); + EXPECT_TRUE(dir_idx_compaction_2.has_value()) << dir_idx_compaction_2.error(); +} + +TEST_F(IndexCompactionTest, test_index_compaction_with_delete) { + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok()); + EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok()); + std::string data_file1 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data1.csv"; + std::string data_file2 = + _current_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data2.csv"; + std::vector<std::string> data_files; + data_files.push_back(data_file1); + data_files.push_back(data_file2); + + std::vector<RowsetSharedPtr> rowsets(data_files.size()); + auto custom_check_build_rowsets = [](const int32_t& size) { EXPECT_EQ(size, 4); }; + IndexCompactionUtils::build_rowsets(_data_dir, _tablet_schema, _tablet, _engine_ref, rowsets, + data_files, custom_check_build_rowsets); + + // create delete predicate rowset and add to tablet + auto delete_rowset = IndexCompactionUtils::create_delete_predicate_rowset( + _tablet_schema, "v1='great'", inc_id++); + EXPECT_TRUE(_tablet->add_rowset(delete_rowset).ok()); + EXPECT_TRUE(_tablet->rowset_map().size() == 3); + rowsets.push_back(delete_rowset); + EXPECT_TRUE(rowsets.size() == 3); + + auto custom_check_index = [](const BaseCompaction& compaction, const RowsetWriterContext& ctx) { + EXPECT_EQ(compaction._cur_tablet_schema->inverted_indexes().size(), 4); + EXPECT_TRUE(ctx.columns_to_do_index_compaction.size() == 2); + EXPECT_TRUE(ctx.columns_to_do_index_compaction.contains(1)); + EXPECT_TRUE(ctx.columns_to_do_index_compaction.contains(2)); + EXPECT_TRUE(compaction._output_rowset->num_segments() == 1); + }; + + RowsetSharedPtr output_rowset_index; + auto st = IndexCompactionUtils::do_compaction(rowsets, _engine_ref, _tablet, true, + output_rowset_index, custom_check_index); + EXPECT_TRUE(st.ok()) << st.to_string(); + + const auto& seg_path = output_rowset_index->segment_path(0); + EXPECT_TRUE(seg_path.has_value()) << seg_path.error(); + auto inverted_index_file_reader_index = IndexCompactionUtils::init_index_file_reader( + output_rowset_index, seg_path.value(), + _tablet_schema->get_inverted_index_storage_format()); + + auto custom_check_normal = [](const BaseCompaction& compaction, + const RowsetWriterContext& ctx) { + EXPECT_EQ(compaction._cur_tablet_schema->inverted_indexes().size(), 4); + EXPECT_TRUE(ctx.columns_to_do_index_compaction.size() == 0); + EXPECT_TRUE(compaction._output_rowset->num_segments() == 1); + }; + + RowsetSharedPtr output_rowset_normal; + st = IndexCompactionUtils::do_compaction(rowsets, _engine_ref, _tablet, false, + output_rowset_normal, custom_check_normal); + EXPECT_TRUE(st.ok()) << st.to_string(); + const auto& seg_path_normal = output_rowset_normal->segment_path(0); + EXPECT_TRUE(seg_path_normal.has_value()) << seg_path_normal.error(); + auto inverted_index_file_reader_normal = IndexCompactionUtils::init_index_file_reader( + output_rowset_normal, seg_path_normal.value(), + _tablet_schema->get_inverted_index_storage_format()); + // check index file terms + auto dir_idx_compaction = inverted_index_file_reader_index->_open(10001, ""); + EXPECT_TRUE(dir_idx_compaction.has_value()) << dir_idx_compaction.error(); + auto dir_normal_compaction = inverted_index_file_reader_normal->_open(10001, ""); + EXPECT_TRUE(dir_normal_compaction.has_value()) << dir_normal_compaction.error(); + std::ostringstream oss; + IndexCompactionUtils::check_terms_stats(dir_idx_compaction->get(), oss); + std::string output = oss.str(); + EXPECT_EQ(output, expected_delete_output); + oss.str(""); + oss.clear(); + IndexCompactionUtils::check_terms_stats(dir_normal_compaction->get(), oss); + output = oss.str(); + EXPECT_EQ(output, expected_delete_output); + + st = IndexCompactionUtils::check_idx_file_correctness(dir_idx_compaction->get(), + dir_normal_compaction->get()); + EXPECT_TRUE(st.ok()) << st.to_string(); + + // check meta and file + std::map<int, QueryData> query_map = { + {0, {{"99", "66", "56", "87", "85", "96", "20000"}, {19, 21, 21, 16, 14, 18, 0}}}, + {3, {{"99", "66", "56", "87", "85", "96", "10000"}, {12, 18, 22, 21, 16, 20, 0}}}, + {1, {{"good", "maybe", "great", "null"}, {197, 191, 0, 0}}}, + {2, {{"musicstream.com", "http", "https", "null"}, {176, 719, 1087, 0}}}}; + IndexCompactionUtils::check_meta_and_file(output_rowset_index, _tablet_schema, query_map); + IndexCompactionUtils::check_meta_and_file(output_rowset_normal, _tablet_schema, query_map); } } // namespace doris diff --git a/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_with_deleted_term.cpp b/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_with_deleted_term.cpp deleted file mode 100644 index a46f5f210df..00000000000 --- a/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_with_deleted_term.cpp +++ /dev/null @@ -1,671 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <gtest/gtest.h> - -#include <iostream> -#include <memory> - -#include "CLucene/StdHeader.h" -#include "CLucene/config/repl_wchar.h" -#include "json2pb/json_to_pb.h" -#include "json2pb/pb_to_json.h" -#include "olap/base_compaction.h" -#include "olap/rowset/beta_rowset.h" -#include "olap/rowset/beta_rowset_writer.h" -#include "olap/rowset/rowset_factory.h" -#include "olap/rowset/segment_v2/inverted_index/query/query_factory.h" -#include "olap/rowset/segment_v2/inverted_index_file_reader.h" -#include "olap/storage_engine.h" - -namespace doris { - -using namespace doris::vectorized; - -constexpr static uint32_t MAX_PATH_LEN = 1024; -constexpr static std::string_view dest_dir = "/ut_dir/inverted_index_test"; -constexpr static std::string_view tmp_dir = "./ut_dir/tmp"; -static int64_t inc_id = 1000; - -struct DataRow { - int key; - std::string word; - std::string url; - int num; -}; - -static std::vector<DataRow> read_data(const std::string file_name) { - std::ifstream file(file_name); - EXPECT_TRUE(file.is_open()); - - std::string line; - std::vector<DataRow> data; - - while (std::getline(file, line)) { - std::stringstream ss(line); - std::string item; - DataRow row; - EXPECT_TRUE(std::getline(ss, item, ',')); - row.key = std::stoi(item); - EXPECT_TRUE(std::getline(ss, item, ',')); - row.word = item; - EXPECT_TRUE(std::getline(ss, item, ',')); - row.url = item; - EXPECT_TRUE(std::getline(ss, item, ',')); - row.num = std::stoi(item); - data.emplace_back(std::move(row)); - } - - file.close(); - return data; -} - -static bool query_bkd(const TabletIndex* index, - std::shared_ptr<InvertedIndexFileReader>& inverted_index_file_reader, - const std::vector<int>& query_data, const std::vector<int>& query_result) { - const auto& idx_reader = BkdIndexReader::create_shared(index, inverted_index_file_reader); - const auto& index_searcher_builder = std::make_unique<BKDIndexSearcherBuilder>(); - auto dir = inverted_index_file_reader->open(index); - EXPECT_TRUE(dir.has_value()); - auto searcher_result = index_searcher_builder->get_index_searcher(dir.value().release()); - EXPECT_TRUE(searcher_result.has_value()); - auto bkd_searcher = std::get_if<BKDIndexSearcherPtr>(&searcher_result.value()); - EXPECT_TRUE(bkd_searcher != nullptr); - idx_reader->_type_info = get_scalar_type_info((FieldType)(*bkd_searcher)->type); - EXPECT_TRUE(idx_reader->_type_info != nullptr); - idx_reader->_value_key_coder = get_key_coder(idx_reader->_type_info->type()); - - for (int i = 0; i < query_data.size(); i++) { - vectorized::Field param_value = Int32(query_data[i]); - std::unique_ptr<segment_v2::InvertedIndexQueryParamFactory> query_param = nullptr; - EXPECT_TRUE(segment_v2::InvertedIndexQueryParamFactory::create_query_value( - PrimitiveType::TYPE_INT, ¶m_value, query_param) - .ok()); - auto result = std::make_shared<roaring::Roaring>(); - EXPECT_TRUE(idx_reader - ->invoke_bkd_query(query_param->get_value(), - InvertedIndexQueryType::EQUAL_QUERY, *bkd_searcher, - result) - .ok()); - EXPECT_EQ(query_result[i], result->cardinality()) << query_data[i]; - } - return true; -} - -static bool query_string(const TabletIndex* index, - std::shared_ptr<InvertedIndexFileReader>& inverted_index_file_reader, - const std::string& column_name, const std::vector<std::string>& query_data, - const std::vector<int>& query_result) { - const auto& idx_reader = - StringTypeInvertedIndexReader::create_shared(index, inverted_index_file_reader); - const auto& index_searcher_builder = std::make_unique<FulltextIndexSearcherBuilder>(); - auto dir = inverted_index_file_reader->open(index); - EXPECT_TRUE(dir.has_value()); - auto searcher_result = index_searcher_builder->get_index_searcher(dir.value().release()); - EXPECT_TRUE(searcher_result.has_value()); - auto string_searcher = std::get_if<FulltextIndexSearcherPtr>(&searcher_result.value()); - EXPECT_TRUE(string_searcher != nullptr); - std::wstring column_name_ws = StringUtil::string_to_wstring(column_name); - - for (int i = 0; i < query_data.size(); i++) { - TQueryOptions queryOptions; - auto query = QueryFactory::create(InvertedIndexQueryType::EQUAL_QUERY, *string_searcher, - queryOptions, nullptr); - EXPECT_TRUE(query != nullptr); - InvertedIndexQueryInfo query_info; - query_info.field_name = column_name_ws; - query_info.terms.emplace_back(query_data[i]); - query->add(query_info); - auto result = std::make_shared<roaring::Roaring>(); - query->search(*result); - EXPECT_EQ(query_result[i], result->cardinality()) << query_data[i]; - } - return true; -} - -static bool query_fulltext(const TabletIndex* index, - std::shared_ptr<InvertedIndexFileReader>& inverted_index_file_reader, - const std::string& column_name, - const std::vector<std::string>& query_data, - const std::vector<int>& query_result) { - const auto& idx_reader = FullTextIndexReader::create_shared(index, inverted_index_file_reader); - const auto& index_searcher_builder = std::make_unique<FulltextIndexSearcherBuilder>(); - auto dir = inverted_index_file_reader->open(index); - EXPECT_TRUE(dir.has_value()); - auto searcher_result = index_searcher_builder->get_index_searcher(dir.value().release()); - EXPECT_TRUE(searcher_result.has_value()); - auto string_searcher = std::get_if<FulltextIndexSearcherPtr>(&searcher_result.value()); - EXPECT_TRUE(string_searcher != nullptr); - std::wstring column_name_ws = StringUtil::string_to_wstring(column_name); - - for (int i = 0; i < query_data.size(); i++) { - TQueryOptions queryOptions; - auto query = QueryFactory::create(InvertedIndexQueryType::MATCH_ANY_QUERY, *string_searcher, - queryOptions, nullptr); - EXPECT_TRUE(query != nullptr); - InvertedIndexQueryInfo query_info; - query_info.field_name = column_name_ws; - query_info.terms.emplace_back(query_data[i]); - query->add(query_info); - auto result = std::make_shared<roaring::Roaring>(); - query->search(*result); - EXPECT_EQ(query_result[i], result->cardinality()) << query_data[i]; - } - return true; -} - -static void check_terms_stats(lucene::store::Directory* dir) { - IndexReader* r = IndexReader::open(dir); - - printf("Max Docs: %d\n", r->maxDoc()); - printf("Num Docs: %d\n", r->numDocs()); - - int64_t ver = r->getCurrentVersion(dir); - printf("Current Version: %f\n", (float_t)ver); - - TermEnum* te = r->terms(); - int32_t nterms; - for (nterms = 0; te->next(); nterms++) { - /* empty */ - std::string token = - lucene_wcstoutf8string(te->term(false)->text(), te->term(false)->textLength()); - std::string field = lucene_wcstoutf8string(te->term(false)->field(), - lenOfString(te->term(false)->field())); - - printf("Field: %s ", field.c_str()); - printf("Term: %s ", token.c_str()); - printf("Freq: %d\n", te->docFreq()); - if (false) { - TermDocs* td = r->termDocs(te->term()); - while (td->next()) { - printf("DocID: %d ", td->doc()); - printf("TermFreq: %d\n", td->freq()); - } - _CLLDELETE(td); - } - } - printf("Term count: %d\n\n", nterms); - te->close(); - _CLLDELETE(te); - - r->close(); - _CLLDELETE(r); -} -static Status check_idx_file_correctness(lucene::store::Directory* index_reader, - lucene::store::Directory* tmp_index_reader) { - lucene::index::IndexReader* idx_reader = lucene::index::IndexReader::open(index_reader); - lucene::index::IndexReader* tmp_idx_reader = lucene::index::IndexReader::open(tmp_index_reader); - - // compare numDocs - if (idx_reader->numDocs() != tmp_idx_reader->numDocs()) { - return Status::InternalError( - "index compaction correctness check failed, numDocs not equal, idx_numDocs={}, " - "tmp_idx_numDocs={}", - idx_reader->numDocs(), tmp_idx_reader->numDocs()); - } - - lucene::index::TermEnum* term_enum = idx_reader->terms(); - lucene::index::TermEnum* tmp_term_enum = tmp_idx_reader->terms(); - lucene::index::TermDocs* term_docs = nullptr; - lucene::index::TermDocs* tmp_term_docs = nullptr; - - // iterate TermEnum - while (term_enum->next() && tmp_term_enum->next()) { - std::string token = lucene_wcstoutf8string(term_enum->term(false)->text(), - term_enum->term(false)->textLength()); - std::string field = lucene_wcstoutf8string(term_enum->term(false)->field(), - lenOfString(term_enum->term(false)->field())); - std::string tmp_token = lucene_wcstoutf8string(tmp_term_enum->term(false)->text(), - tmp_term_enum->term(false)->textLength()); - std::string tmp_field = - lucene_wcstoutf8string(tmp_term_enum->term(false)->field(), - lenOfString(tmp_term_enum->term(false)->field())); - // compare token and field - if (field != tmp_field) { - return Status::InternalError( - "index compaction correctness check failed, fields not equal, field={}, " - "tmp_field={}", - field, field); - } - if (token != tmp_token) { - return Status::InternalError( - "index compaction correctness check failed, tokens not equal, token={}, " - "tmp_token={}", - token, tmp_token); - } - - // get term's docId and freq - term_docs = idx_reader->termDocs(term_enum->term(false)); - tmp_term_docs = tmp_idx_reader->termDocs(tmp_term_enum->term(false)); - - // compare term's docId and freq - while (term_docs->next() && tmp_term_docs->next()) { - if (term_docs->doc() != tmp_term_docs->doc() || - term_docs->freq() != tmp_term_docs->freq()) { - return Status::InternalError( - "index compaction correctness check failed, docId or freq not equal, " - "docId={}, tmp_docId={}, freq={}, tmp_freq={}", - term_docs->doc(), tmp_term_docs->doc(), term_docs->freq(), - tmp_term_docs->freq()); - } - } - - // check if there are remaining docs - if (term_docs->next() || tmp_term_docs->next()) { - return Status::InternalError( - "index compaction correctness check failed, number of docs not equal for " - "term={}, tmp_term={}", - token, tmp_token); - } - if (term_docs) { - term_docs->close(); - _CLLDELETE(term_docs); - } - if (tmp_term_docs) { - tmp_term_docs->close(); - _CLLDELETE(tmp_term_docs); - } - } - - // check if there are remaining terms - if (term_enum->next() || tmp_term_enum->next()) { - return Status::InternalError( - "index compaction correctness check failed, number of terms not equal"); - } - if (term_enum) { - term_enum->close(); - _CLLDELETE(term_enum); - } - if (tmp_term_enum) { - tmp_term_enum->close(); - _CLLDELETE(tmp_term_enum); - } - if (idx_reader) { - idx_reader->close(); - _CLLDELETE(idx_reader); - } - if (tmp_idx_reader) { - tmp_idx_reader->close(); - _CLLDELETE(tmp_idx_reader); - } - return Status::OK(); -} - -static RowsetSharedPtr do_compaction(std::vector<RowsetSharedPtr> rowsets, - StorageEngine* engine_ref, TabletSharedPtr tablet, - bool is_index_compaction) { - config::inverted_index_compaction_enable = is_index_compaction; - // only base compaction can handle delete predicate - BaseCompaction compaction(*engine_ref, tablet); - compaction._input_rowsets = std::move(rowsets); - compaction.build_basic_info(); - - std::vector<RowsetReaderSharedPtr> input_rs_readers; - input_rs_readers.reserve(compaction._input_rowsets.size()); - for (auto& rowset : compaction._input_rowsets) { - RowsetReaderSharedPtr rs_reader; - EXPECT_TRUE(rowset->create_reader(&rs_reader).ok()); - input_rs_readers.push_back(std::move(rs_reader)); - } - - RowsetWriterContext ctx; - EXPECT_TRUE(compaction.construct_output_rowset_writer(ctx).ok()); - - if (is_index_compaction) { - EXPECT_TRUE(ctx.columns_to_do_index_compaction.size() == 2); - // col v1 - EXPECT_TRUE(ctx.columns_to_do_index_compaction.contains(1)); - // col v2 - EXPECT_TRUE(ctx.columns_to_do_index_compaction.contains(2)); - } - - compaction._stats.rowid_conversion = compaction._rowid_conversion.get(); - EXPECT_TRUE(Merger::vertical_merge_rowsets(tablet, compaction.compaction_type(), - *(compaction._cur_tablet_schema), input_rs_readers, - compaction._output_rs_writer.get(), 100000, 5, - &compaction._stats) - .ok()); - const auto& dst_writer = - dynamic_cast<BaseBetaRowsetWriter*>(compaction._output_rs_writer.get()); - for (const auto& [seg_id, idx_file_writer] : dst_writer->_idx_files.get_file_writers()) { - EXPECT_FALSE(idx_file_writer->_closed); - } - Status st = compaction.do_inverted_index_compaction(); - EXPECT_TRUE(st.ok()) << st.to_string(); - - st = compaction._output_rs_writer->build(compaction._output_rowset); - EXPECT_TRUE(st.ok()) << st.to_string(); - - for (const auto& [seg_id, idx_file_writer] : dst_writer->_idx_files.get_file_writers()) { - EXPECT_TRUE(idx_file_writer->_closed); - } - EXPECT_TRUE(compaction._output_rowset->num_segments() == 1); - - return compaction._output_rowset; -} - -class IndexCompactionDeleteTest : public ::testing::Test { -protected: - void SetUp() override { - // absolute dir - char buffer[MAX_PATH_LEN]; - EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr); - _curreent_dir = std::string(buffer); - _absolute_dir = _curreent_dir + std::string(dest_dir); - EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_absolute_dir).ok()); - EXPECT_TRUE(io::global_local_filesystem()->create_directory(_absolute_dir).ok()); - - // tmp dir - EXPECT_TRUE(io::global_local_filesystem()->delete_directory(tmp_dir).ok()); - EXPECT_TRUE(io::global_local_filesystem()->create_directory(tmp_dir).ok()); - std::vector<StorePath> paths; - paths.emplace_back(std::string(tmp_dir), 1024000000); - auto tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(paths); - Status st = tmp_file_dirs->init(); - EXPECT_TRUE(st.ok()) << st.to_json(); - ExecEnv::GetInstance()->set_tmp_file_dir(std::move(tmp_file_dirs)); - - // storage engine - doris::EngineOptions options; - auto engine = std::make_unique<StorageEngine>(options); - _engine_ref = engine.get(); - _data_dir = std::make_unique<DataDir>(*_engine_ref, _absolute_dir); - static_cast<void>(_data_dir->update_capacity()); - ExecEnv::GetInstance()->set_storage_engine(std::move(engine)); - - // tablet_schema - TabletSchemaPB schema_pb; - schema_pb.set_keys_type(KeysType::DUP_KEYS); - schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2); - - construct_column(schema_pb.add_column(), schema_pb.add_index(), 10000, "key_index", 0, - "INT", "key"); - construct_column(schema_pb.add_column(), schema_pb.add_index(), 10001, "v1_index", 1, - "STRING", "v1"); - construct_column(schema_pb.add_column(), schema_pb.add_index(), 10002, "v2_index", 2, - "STRING", "v2", true); - construct_column(schema_pb.add_column(), schema_pb.add_index(), 10003, "v3_index", 3, "INT", - "v3"); - - _tablet_schema.reset(new TabletSchema); - _tablet_schema->init_from_pb(schema_pb); - - // tablet - TabletMetaSharedPtr tablet_meta(new TabletMeta(_tablet_schema)); - - _tablet.reset(new Tablet(*_engine_ref, tablet_meta, _data_dir.get())); - EXPECT_TRUE(_tablet->init().ok()); - } - void TearDown() override { - EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok()); - EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_absolute_dir).ok()); - EXPECT_TRUE(io::global_local_filesystem()->delete_directory(tmp_dir).ok()); - _engine_ref = nullptr; - ExecEnv::GetInstance()->set_storage_engine(nullptr); - } - - void init_rs_meta(RowsetMetaSharedPtr& rs_meta, int64_t start, int64_t end) { - std::string json_rowset_meta = R"({ - "rowset_id": 540081, - "tablet_id": 15673, - "partition_id": 10000, - "tablet_schema_hash": 567997577, - "rowset_type": "BETA_ROWSET", - "rowset_state": "VISIBLE", - "empty": false - })"; - RowsetMetaPB rowset_meta_pb; - json2pb::JsonToProtoMessage(json_rowset_meta, &rowset_meta_pb); - rowset_meta_pb.set_start_version(start); - rowset_meta_pb.set_end_version(end); - rs_meta->init_from_pb(rowset_meta_pb); - } - - RowsetSharedPtr create_delete_predicate_rowset(const TabletSchemaSPtr& schema, std::string pred, - int64_t version) { - DeletePredicatePB del_pred; - del_pred.add_sub_predicates(pred); - del_pred.set_version(1); - RowsetMetaSharedPtr rsm(new RowsetMeta()); - init_rs_meta(rsm, version, version); - RowsetId id; - id.init(version); - rsm->set_rowset_id(id); - rsm->set_delete_predicate(std::move(del_pred)); - rsm->set_tablet_schema(schema); - return std::make_shared<BetaRowset>(schema, rsm, ""); - } - - void construct_column(ColumnPB* column_pb, TabletIndexPB* tablet_index, int64_t index_id, - const std::string& index_name, int32_t col_unique_id, - const std::string& column_type, const std::string& column_name, - bool parser = false) { - column_pb->set_unique_id(col_unique_id); - column_pb->set_name(column_name); - column_pb->set_type(column_type); - column_pb->set_is_key(false); - column_pb->set_is_nullable(true); - tablet_index->set_index_id(index_id); - tablet_index->set_index_name(index_name); - tablet_index->set_index_type(IndexType::INVERTED); - tablet_index->add_col_unique_id(col_unique_id); - if (parser) { - auto* properties = tablet_index->mutable_properties(); - (*properties)[INVERTED_INDEX_PARSER_KEY] = INVERTED_INDEX_PARSER_UNICODE; - } - } - - void check_meta_and_file(RowsetSharedPtr output_rowset) { - CHECK_EQ(output_rowset->num_segments(), 1); - // check rowset meta and file - int seg_id = 0; - // meta - const auto& index_info = output_rowset->_rowset_meta->inverted_index_file_info(seg_id); - EXPECT_TRUE(index_info.has_index_size()); - const auto& fs = output_rowset->_rowset_meta->fs(); - const auto& file_name = fmt::format("{}/{}_{}.idx", output_rowset->tablet_path(), - output_rowset->rowset_id().to_string(), seg_id); - int64_t file_size = 0; - EXPECT_TRUE(fs->file_size(file_name, &file_size).ok()); - EXPECT_EQ(index_info.index_size(), file_size); - - // file - const auto& seg_path = output_rowset->segment_path(seg_id); - EXPECT_TRUE(seg_path.has_value()); - const auto& index_file_path_prefix = - InvertedIndexDescriptor::get_index_file_path_prefix(seg_path.value()); - auto inverted_index_file_reader = std::make_shared<InvertedIndexFileReader>( - fs, std::string(index_file_path_prefix), - _tablet_schema->get_inverted_index_storage_format(), index_info); - EXPECT_TRUE(inverted_index_file_reader->init().ok()); - const auto& dirs = inverted_index_file_reader->get_all_directories(); - EXPECT_TRUE(dirs.has_value()); - EXPECT_EQ(dirs.value().size(), 4); - - // read col key - const auto& key = _tablet_schema->column_by_uid(0); - const auto* key_index = _tablet_schema->inverted_index(key); - EXPECT_TRUE(key_index != nullptr); - std::vector<int> query_data {99, 66, 56, 87, 85, 96, 20000}; - std::vector<int> query_result {19, 21, 21, 16, 14, 18, 0}; - EXPECT_TRUE(query_bkd(key_index, inverted_index_file_reader, query_data, query_result)); - - // read col v3 - const auto& v3_column = _tablet_schema->column_by_uid(3); - const auto* v3_index = _tablet_schema->inverted_index(v3_column); - EXPECT_TRUE(v3_index != nullptr); - std::vector<int> query_data3 {99, 66, 56, 87, 85, 96, 10000}; - std::vector<int> query_result3 {12, 18, 22, 21, 16, 20, 0}; - EXPECT_TRUE(query_bkd(v3_index, inverted_index_file_reader, query_data3, query_result3)); - - // read col v1 - const auto& v1_column = _tablet_schema->column_by_uid(1); - const auto* v1_index = _tablet_schema->inverted_index(v1_column); - EXPECT_TRUE(v1_index != nullptr); - std::vector<std::string> query_data1 {"good", "maybe", "great", "null"}; - std::vector<int> query_result1 {197, 191, 0, 0}; - EXPECT_TRUE(query_string(v1_index, inverted_index_file_reader, "1", query_data1, - query_result1)); - - // read col v2 - const auto& v2_column = _tablet_schema->column_by_uid(2); - const auto* v2_index = _tablet_schema->inverted_index(v2_column); - EXPECT_TRUE(v2_index != nullptr); - std::vector<std::string> query_data2 {"musicstream.com", "http", "https", "null"}; - std::vector<int> query_result2 {176, 719, 1087, 0}; - EXPECT_TRUE(query_fulltext(v2_index, inverted_index_file_reader, "2", query_data2, - query_result2)); - } - - RowsetWriterContext rowset_writer_context() { - RowsetWriterContext context; - RowsetId rowset_id; - rowset_id.init(inc_id); - context.rowset_id = rowset_id; - context.rowset_type = BETA_ROWSET; - context.data_dir = _data_dir.get(); - context.rowset_state = VISIBLE; - context.tablet_schema = _tablet_schema; - context.tablet_path = _tablet->tablet_path(); - context.version = Version(inc_id, inc_id); - context.max_rows_per_segment = 200; - inc_id++; - return context; - } - - IndexCompactionDeleteTest() = default; - ~IndexCompactionDeleteTest() override = default; - -private: - TabletSchemaSPtr _tablet_schema = nullptr; - StorageEngine* _engine_ref = nullptr; - std::unique_ptr<DataDir> _data_dir = nullptr; - TabletSharedPtr _tablet = nullptr; - std::string _absolute_dir; - std::string _curreent_dir; -}; - -TEST_F(IndexCompactionDeleteTest, delete_index_test) { - EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_tablet->tablet_path()).ok()); - EXPECT_TRUE(io::global_local_filesystem()->create_directory(_tablet->tablet_path()).ok()); - std::string data_file1 = - _curreent_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data1.csv"; - std::string data_file2 = - _curreent_dir + "/be/test/olap/rowset/segment_v2/inverted_index/data/data2.csv"; - - std::vector<std::vector<DataRow>> data; - data.emplace_back(read_data(data_file1)); - data.emplace_back(read_data(data_file2)); - - std::vector<RowsetSharedPtr> rowsets(data.size()); - for (int i = 0; i < data.size(); i++) { - const auto& res = - RowsetFactory::create_rowset_writer(*_engine_ref, rowset_writer_context(), false); - EXPECT_TRUE(res.has_value()) << res.error(); - const auto& rowset_writer = res.value(); - - Block block = _tablet_schema->create_block(); - auto columns = block.mutate_columns(); - for (const auto& row : data[i]) { - vectorized::Field key = Int32(row.key); - vectorized::Field v1(row.word); - vectorized::Field v2(row.url); - vectorized::Field v3 = Int32(row.num); - columns[0]->insert(key); - columns[1]->insert(v1); - columns[2]->insert(v2); - columns[3]->insert(v3); - } - EXPECT_TRUE(rowset_writer->add_block(&block).ok()); - EXPECT_TRUE(rowset_writer->flush().ok()); - const auto& dst_writer = dynamic_cast<BaseBetaRowsetWriter*>(rowset_writer.get()); - - // inverted index file writer - for (const auto& [seg_id, idx_file_writer] : dst_writer->_idx_files.get_file_writers()) { - EXPECT_TRUE(idx_file_writer->_closed); - } - - EXPECT_TRUE(rowset_writer->build(rowsets[i]).ok()); - EXPECT_TRUE(_tablet->add_rowset(rowsets[i]).ok()); - EXPECT_TRUE(rowsets[i]->num_segments() == 5); - - // check rowset meta and file - for (int seg_id = 0; seg_id < rowsets[i]->num_segments(); seg_id++) { - const auto& index_info = rowsets[i]->_rowset_meta->inverted_index_file_info(seg_id); - EXPECT_TRUE(index_info.has_index_size()); - const auto& fs = rowsets[i]->_rowset_meta->fs(); - const auto& file_name = fmt::format("{}/{}_{}.idx", rowsets[i]->tablet_path(), - rowsets[i]->rowset_id().to_string(), seg_id); - int64_t file_size = 0; - EXPECT_TRUE(fs->file_size(file_name, &file_size).ok()); - EXPECT_EQ(index_info.index_size(), file_size); - - const auto& seg_path = rowsets[i]->segment_path(seg_id); - EXPECT_TRUE(seg_path.has_value()); - const auto& index_file_path_prefix = - InvertedIndexDescriptor::get_index_file_path_prefix(seg_path.value()); - auto inverted_index_file_reader = std::make_shared<InvertedIndexFileReader>( - fs, std::string(index_file_path_prefix), - _tablet_schema->get_inverted_index_storage_format(), index_info); - EXPECT_TRUE(inverted_index_file_reader->init().ok()); - const auto& dirs = inverted_index_file_reader->get_all_directories(); - EXPECT_TRUE(dirs.has_value()); - EXPECT_EQ(dirs.value().size(), 4); - } - } - - // create delete predicate rowset and add to tablet - auto delete_rowset = create_delete_predicate_rowset(_tablet_schema, "v1='great'", inc_id++); - EXPECT_TRUE(_tablet->add_rowset(delete_rowset).ok()); - EXPECT_TRUE(_tablet->rowset_map().size() == 3); - rowsets.push_back(delete_rowset); - EXPECT_TRUE(rowsets.size() == 3); - - auto output_rowset_index = do_compaction(rowsets, _engine_ref, _tablet, true); - const auto& seg_path = output_rowset_index->segment_path(0); - EXPECT_TRUE(seg_path.has_value()); - const auto& index_file_path_prefix = - InvertedIndexDescriptor::get_index_file_path_prefix(seg_path.value()); - auto inverted_index_file_reader_index = std::make_shared<InvertedIndexFileReader>( - output_rowset_index->_rowset_meta->fs(), std::string(index_file_path_prefix), - _tablet_schema->get_inverted_index_storage_format()); - EXPECT_TRUE(inverted_index_file_reader_index->init().ok()); - - auto output_rowset_normal = do_compaction(rowsets, _engine_ref, _tablet, false); - const auto& seg_path_normal = output_rowset_normal->segment_path(0); - EXPECT_TRUE(seg_path_normal.has_value()); - const auto& index_file_path_prefix_normal = - InvertedIndexDescriptor::get_index_file_path_prefix(seg_path_normal.value()); - auto inverted_index_file_reader_normal = std::make_shared<InvertedIndexFileReader>( - output_rowset_normal->_rowset_meta->fs(), std::string(index_file_path_prefix_normal), - _tablet_schema->get_inverted_index_storage_format()); - EXPECT_TRUE(inverted_index_file_reader_normal->init().ok()); - - // check index file terms - auto dir_idx_compaction = inverted_index_file_reader_index->_open(10001, ""); - auto dir_normal_compaction = inverted_index_file_reader_normal->_open(10001, ""); - check_terms_stats(dir_idx_compaction->get()); - check_terms_stats(dir_normal_compaction->get()); - auto st = check_idx_file_correctness(dir_idx_compaction->get(), dir_normal_compaction->get()); - EXPECT_TRUE(st.ok()) << st.to_string(); - - // check meta and file - check_meta_and_file(output_rowset_index); - check_meta_and_file(output_rowset_normal); -} - -} // namespace doris diff --git a/be/test/olap/rowset/segment_v2/inverted_index/compaction/util/index_compaction_utils.cpp b/be/test/olap/rowset/segment_v2/inverted_index/compaction/util/index_compaction_utils.cpp new file mode 100644 index 00000000000..530dca8054c --- /dev/null +++ b/be/test/olap/rowset/segment_v2/inverted_index/compaction/util/index_compaction_utils.cpp @@ -0,0 +1,621 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include <fstream> +#include <iomanip> +#include <iostream> +#include <memory> +#include <sstream> +#include <vector> + +#include "CLucene/StdHeader.h" +#include "CLucene/config/repl_wchar.h" +#include "json2pb/json_to_pb.h" +#include "json2pb/pb_to_json.h" +#include "olap/base_compaction.h" +#include "olap/rowset/beta_rowset.h" +#include "olap/rowset/beta_rowset_writer.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/rowset/segment_v2/inverted_index/query/query_factory.h" +#include "olap/rowset/segment_v2/inverted_index_file_reader.h" +#include "olap/storage_engine.h" + +namespace doris { + +static int64_t inc_id = 1000; +const static std::string expected_output = + "Max Docs: 2000\n" + "Num Docs: 2000\n" + "Field: 1 Term: bad Freq: 196\n" + "Field: 1 Term: excellent Freq: 227\n" + "Field: 1 Term: fine Freq: 190\n" + "Field: 1 Term: good Freq: 197\n" + "Field: 1 Term: great Freq: 194\n" + "Field: 1 Term: maybe Freq: 191\n" + "Field: 1 Term: no Freq: 205\n" + "Field: 1 Term: ok Freq: 175\n" + "Field: 1 Term: terrible Freq: 205\n" + "Field: 1 Term: yes Freq: 220\n" + "Term count: 10\n\n"; +const static std::string expected_delete_output = + "Max Docs: 1806\n" + "Num Docs: 1806\n" + "Field: 1 Term: bad Freq: 196\n" + "Field: 1 Term: excellent Freq: 227\n" + "Field: 1 Term: fine Freq: 190\n" + "Field: 1 Term: good Freq: 197\n" + "Field: 1 Term: maybe Freq: 191\n" + "Field: 1 Term: no Freq: 205\n" + "Field: 1 Term: ok Freq: 175\n" + "Field: 1 Term: terrible Freq: 205\n" + "Field: 1 Term: yes Freq: 220\n" + "Term count: 9\n\n"; + +using QueryData = std::pair<std::vector<std::string>, std::vector<int>>; + +class IndexCompactionUtils { + struct DataRow { + int key; + std::string word; + std::string url; + int num; + }; + + static std::vector<DataRow> read_data(const std::string file_name) { + std::ifstream file(file_name); + EXPECT_TRUE(file.is_open()); + + std::string line; + std::vector<DataRow> data; + + while (std::getline(file, line)) { + std::stringstream ss(line); + std::string item; + DataRow row; + EXPECT_TRUE(std::getline(ss, item, ',')); + row.key = std::stoi(item); + EXPECT_TRUE(std::getline(ss, item, ',')); + row.word = item; + EXPECT_TRUE(std::getline(ss, item, ',')); + row.url = item; + EXPECT_TRUE(std::getline(ss, item, ',')); + row.num = std::stoi(item); + data.emplace_back(std::move(row)); + } + + file.close(); + return data; + } + + static bool query_bkd(const TabletIndex* index, + std::shared_ptr<InvertedIndexFileReader>& inverted_index_file_reader, + const std::vector<int>& query_data, + const std::vector<int>& query_result) { + const auto& idx_reader = BkdIndexReader::create_shared(index, inverted_index_file_reader); + const auto& index_searcher_builder = std::make_unique<BKDIndexSearcherBuilder>(); + auto dir = inverted_index_file_reader->open(index); + EXPECT_TRUE(dir.has_value()); + auto searcher_result = index_searcher_builder->get_index_searcher(dir.value().release()); + EXPECT_TRUE(searcher_result.has_value()); + auto bkd_searcher = std::get_if<BKDIndexSearcherPtr>(&searcher_result.value()); + EXPECT_TRUE(bkd_searcher != nullptr); + idx_reader->_type_info = get_scalar_type_info((FieldType)(*bkd_searcher)->type); + EXPECT_TRUE(idx_reader->_type_info != nullptr); + idx_reader->_value_key_coder = get_key_coder(idx_reader->_type_info->type()); + + for (int i = 0; i < query_data.size(); i++) { + vectorized::Field param_value = int32_t(query_data[i]); + std::unique_ptr<segment_v2::InvertedIndexQueryParamFactory> query_param = nullptr; + EXPECT_TRUE(segment_v2::InvertedIndexQueryParamFactory::create_query_value( + PrimitiveType::TYPE_INT, ¶m_value, query_param) + .ok()); + auto result = std::make_shared<roaring::Roaring>(); + EXPECT_TRUE(idx_reader + ->invoke_bkd_query(query_param->get_value(), + InvertedIndexQueryType::EQUAL_QUERY, + *bkd_searcher, result) + .ok()); + EXPECT_EQ(query_result[i], result->cardinality()) << query_data[i]; + } + return true; + } + + static bool query_string(const TabletIndex* index, + std::shared_ptr<InvertedIndexFileReader>& inverted_index_file_reader, + const std::string& column_name, + const std::vector<std::string>& query_data, + const std::vector<int>& query_result) { + const auto& idx_reader = + StringTypeInvertedIndexReader::create_shared(index, inverted_index_file_reader); + const auto& index_searcher_builder = std::make_unique<FulltextIndexSearcherBuilder>(); + auto dir = inverted_index_file_reader->open(index); + EXPECT_TRUE(dir.has_value()); + auto searcher_result = index_searcher_builder->get_index_searcher(dir.value().release()); + EXPECT_TRUE(searcher_result.has_value()); + auto string_searcher = std::get_if<FulltextIndexSearcherPtr>(&searcher_result.value()); + EXPECT_TRUE(string_searcher != nullptr); + std::wstring column_name_ws = StringUtil::string_to_wstring(column_name); + + for (int i = 0; i < query_data.size(); i++) { + TQueryOptions queryOptions; + auto query = QueryFactory::create(InvertedIndexQueryType::EQUAL_QUERY, *string_searcher, + queryOptions, nullptr); + EXPECT_TRUE(query != nullptr); + InvertedIndexQueryInfo query_info; + query_info.field_name = column_name_ws; + query_info.terms.emplace_back(query_data[i]); + query->add(query_info); + auto result = std::make_shared<roaring::Roaring>(); + query->search(*result); + EXPECT_EQ(query_result[i], result->cardinality()) << query_data[i]; + } + return true; + } + + static bool query_fulltext(const TabletIndex* index, + std::shared_ptr<InvertedIndexFileReader>& inverted_index_file_reader, + const std::string& column_name, + const std::vector<std::string>& query_data, + const std::vector<int>& query_result) { + const auto& idx_reader = + FullTextIndexReader::create_shared(index, inverted_index_file_reader); + const auto& index_searcher_builder = std::make_unique<FulltextIndexSearcherBuilder>(); + auto dir = inverted_index_file_reader->open(index); + EXPECT_TRUE(dir.has_value()); + auto searcher_result = index_searcher_builder->get_index_searcher(dir.value().release()); + EXPECT_TRUE(searcher_result.has_value()); + auto string_searcher = std::get_if<FulltextIndexSearcherPtr>(&searcher_result.value()); + EXPECT_TRUE(string_searcher != nullptr); + std::wstring column_name_ws = StringUtil::string_to_wstring(column_name); + + for (int i = 0; i < query_data.size(); i++) { + TQueryOptions queryOptions; + auto query = QueryFactory::create(InvertedIndexQueryType::MATCH_ANY_QUERY, + *string_searcher, queryOptions, nullptr); + EXPECT_TRUE(query != nullptr); + InvertedIndexQueryInfo query_info; + query_info.field_name = column_name_ws; + query_info.terms.emplace_back(query_data[i]); + query->add(query_info); + auto result = std::make_shared<roaring::Roaring>(); + query->search(*result); + EXPECT_EQ(query_result[i], result->cardinality()) << query_data[i]; + } + return true; + } + + static void check_terms_stats(lucene::store::Directory* dir, std::ostream& os = std::cout) { + IndexReader* r = IndexReader::open(dir); + + os << "Max Docs: " << r->maxDoc() << "\n"; + os << "Num Docs: " << r->numDocs() << "\n"; + + TermEnum* te = r->terms(); + int32_t nterms; + for (nterms = 0; te->next(); nterms++) { + std::string token = + lucene_wcstoutf8string(te->term(false)->text(), te->term(false)->textLength()); + std::string field = lucene_wcstoutf8string(te->term(false)->field(), + lenOfString(te->term(false)->field())); + + os << "Field: " << field << " "; + os << "Term: " << token << " "; + os << "Freq: " << te->docFreq() << "\n"; + if (false) { + TermDocs* td = r->termDocs(te->term()); + while (td->next()) { + os << "DocID: " << td->doc() << " "; + os << "TermFreq: " << td->freq() << "\n"; + } + _CLLDELETE(td); + } + } + os << "Term count: " << nterms << "\n\n"; + te->close(); + _CLLDELETE(te); + + r->close(); + _CLLDELETE(r); + } + static Status check_idx_file_correctness(lucene::store::Directory* index_reader, + lucene::store::Directory* tmp_index_reader) { + lucene::index::IndexReader* idx_reader = lucene::index::IndexReader::open(index_reader); + lucene::index::IndexReader* tmp_idx_reader = + lucene::index::IndexReader::open(tmp_index_reader); + + // compare numDocs + if (idx_reader->numDocs() != tmp_idx_reader->numDocs()) { + return Status::InternalError( + "index compaction correctness check failed, numDocs not equal, idx_numDocs={}, " + "tmp_idx_numDocs={}", + idx_reader->numDocs(), tmp_idx_reader->numDocs()); + } + + lucene::index::TermEnum* term_enum = idx_reader->terms(); + lucene::index::TermEnum* tmp_term_enum = tmp_idx_reader->terms(); + lucene::index::TermDocs* term_docs = nullptr; + lucene::index::TermDocs* tmp_term_docs = nullptr; + + // iterate TermEnum + while (term_enum->next() && tmp_term_enum->next()) { + std::string token = lucene_wcstoutf8string(term_enum->term(false)->text(), + term_enum->term(false)->textLength()); + std::string field = lucene_wcstoutf8string( + term_enum->term(false)->field(), lenOfString(term_enum->term(false)->field())); + std::string tmp_token = lucene_wcstoutf8string( + tmp_term_enum->term(false)->text(), tmp_term_enum->term(false)->textLength()); + std::string tmp_field = + lucene_wcstoutf8string(tmp_term_enum->term(false)->field(), + lenOfString(tmp_term_enum->term(false)->field())); + // compare token and field + if (field != tmp_field) { + return Status::InternalError( + "index compaction correctness check failed, fields not equal, field={}, " + "tmp_field={}", + field, field); + } + if (token != tmp_token) { + return Status::InternalError( + "index compaction correctness check failed, tokens not equal, token={}, " + "tmp_token={}", + token, tmp_token); + } + + // get term's docId and freq + term_docs = idx_reader->termDocs(term_enum->term(false)); + tmp_term_docs = tmp_idx_reader->termDocs(tmp_term_enum->term(false)); + + // compare term's docId and freq + while (term_docs->next() && tmp_term_docs->next()) { + if (term_docs->doc() != tmp_term_docs->doc() || + term_docs->freq() != tmp_term_docs->freq()) { + return Status::InternalError( + "index compaction correctness check failed, docId or freq not equal, " + "docId={}, tmp_docId={}, freq={}, tmp_freq={}", + term_docs->doc(), tmp_term_docs->doc(), term_docs->freq(), + tmp_term_docs->freq()); + } + } + + // check if there are remaining docs + if (term_docs->next() || tmp_term_docs->next()) { + return Status::InternalError( + "index compaction correctness check failed, number of docs not equal for " + "term={}, tmp_term={}", + token, tmp_token); + } + if (term_docs) { + term_docs->close(); + _CLLDELETE(term_docs); + } + if (tmp_term_docs) { + tmp_term_docs->close(); + _CLLDELETE(tmp_term_docs); + } + } + + // check if there are remaining terms + if (term_enum->next() || tmp_term_enum->next()) { + return Status::InternalError( + "index compaction correctness check failed, number of terms not equal"); + } + if (term_enum) { + term_enum->close(); + _CLLDELETE(term_enum); + } + if (tmp_term_enum) { + tmp_term_enum->close(); + _CLLDELETE(tmp_term_enum); + } + if (idx_reader) { + idx_reader->close(); + _CLLDELETE(idx_reader); + } + if (tmp_idx_reader) { + tmp_idx_reader->close(); + _CLLDELETE(tmp_idx_reader); + } + return Status::OK(); + } + + static Status do_compaction( + const std::vector<RowsetSharedPtr>& rowsets, StorageEngine* engine_ref, + const TabletSharedPtr& tablet, bool is_index_compaction, RowsetSharedPtr& rowset_ptr, + const std::function<void(const BaseCompaction&, const RowsetWriterContext&)> + custom_check = nullptr) { + config::inverted_index_compaction_enable = is_index_compaction; + // only base compaction can handle delete predicate + BaseCompaction compaction(*engine_ref, tablet); + compaction._input_rowsets = std::move(rowsets); + compaction.build_basic_info(); + + std::vector<RowsetReaderSharedPtr> input_rs_readers; + create_input_rowsets_readers(compaction, input_rs_readers); + + RowsetWriterContext ctx; + RETURN_IF_ERROR(compaction.construct_output_rowset_writer(ctx)); + + compaction._stats.rowid_conversion = compaction._rowid_conversion.get(); + RETURN_IF_ERROR(Merger::vertical_merge_rowsets( + tablet, compaction.compaction_type(), *(compaction._cur_tablet_schema), + input_rs_readers, compaction._output_rs_writer.get(), 100000, 5, + &compaction._stats)); + + const auto& dst_writer = + dynamic_cast<BaseBetaRowsetWriter*>(compaction._output_rs_writer.get()); + check_idx_file_writer_closed(dst_writer, false); + + RETURN_IF_ERROR(compaction.do_inverted_index_compaction()); + + RETURN_IF_ERROR(compaction._output_rs_writer->build(compaction._output_rowset)); + + check_idx_file_writer_closed(dst_writer, true); + + if (custom_check) { + custom_check(compaction, ctx); + } + + rowset_ptr = std::move(compaction._output_rowset); + return Status::OK(); + } + + static void create_input_rowsets_readers(const BaseCompaction& compaction, + std::vector<RowsetReaderSharedPtr>& input_rs_readers) { + input_rs_readers.reserve(compaction._input_rowsets.size()); + for (auto& rowset : compaction._input_rowsets) { + RowsetReaderSharedPtr rs_reader; + EXPECT_TRUE(rowset->create_reader(&rs_reader).ok()); + input_rs_readers.push_back(std::move(rs_reader)); + } + } + + static void check_idx_file_writer_closed(BaseBetaRowsetWriter* writer, bool closed) { + for (const auto& [seg_id, idx_file_writer] : writer->inverted_index_file_writers()) { + EXPECT_EQ(idx_file_writer->_closed, closed); + } + } + + static void init_rs_meta(RowsetMetaSharedPtr& rs_meta, int64_t start, int64_t end) { + std::string json_rowset_meta = R"({ + "rowset_id": 540081, + "tablet_id": 15673, + "partition_id": 10000, + "tablet_schema_hash": 567997577, + "rowset_type": "BETA_ROWSET", + "rowset_state": "VISIBLE", + "empty": false + })"; + RowsetMetaPB rowset_meta_pb; + json2pb::JsonToProtoMessage(json_rowset_meta, &rowset_meta_pb); + rowset_meta_pb.set_start_version(start); + rowset_meta_pb.set_end_version(end); + rs_meta->init_from_pb(rowset_meta_pb); + } + + static RowsetSharedPtr create_delete_predicate_rowset(const TabletSchemaSPtr& schema, + std::string pred, int64_t version) { + DeletePredicatePB del_pred; + del_pred.add_sub_predicates(pred); + del_pred.set_version(1); + RowsetMetaSharedPtr rsm(new RowsetMeta()); + init_rs_meta(rsm, version, version); + RowsetId id; + id.init(version); + rsm->set_rowset_id(id); + rsm->set_delete_predicate(std::move(del_pred)); + rsm->set_tablet_schema(schema); + return std::make_shared<BetaRowset>(schema, rsm, ""); + } + + static void construct_column(ColumnPB* column_pb, TabletIndexPB* tablet_index, int64_t index_id, + const std::string& index_name, int32_t col_unique_id, + const std::string& column_type, const std::string& column_name, + bool parser = false) { + column_pb->set_unique_id(col_unique_id); + column_pb->set_name(column_name); + column_pb->set_type(column_type); + column_pb->set_is_key(false); + column_pb->set_is_nullable(true); + tablet_index->set_index_id(index_id); + tablet_index->set_index_name(index_name); + tablet_index->set_index_type(IndexType::INVERTED); + tablet_index->add_col_unique_id(col_unique_id); + if (parser) { + auto* properties = tablet_index->mutable_properties(); + (*properties)[INVERTED_INDEX_PARSER_KEY] = INVERTED_INDEX_PARSER_UNICODE; + } + } + + static void construct_column(ColumnPB* column_pb, int32_t col_unique_id, + const std::string& column_type, const std::string& column_name) { + column_pb->set_unique_id(col_unique_id); + column_pb->set_name(column_name); + column_pb->set_type(column_type); + column_pb->set_is_key(false); + column_pb->set_is_nullable(true); + } + + static void construct_index(TabletIndexPB* tablet_index, int64_t index_id, + const std::string& index_name, int32_t col_unique_id, + bool parser = false) { + tablet_index->set_index_id(index_id); + tablet_index->set_index_name(index_name); + tablet_index->set_index_type(IndexType::INVERTED); + tablet_index->add_col_unique_id(col_unique_id); + if (parser) { + auto* properties = tablet_index->mutable_properties(); + (*properties)[INVERTED_INDEX_PARSER_KEY] = INVERTED_INDEX_PARSER_UNICODE; + } + } + + static void check_meta_and_file(const RowsetSharedPtr& output_rowset, + const TabletSchemaSPtr& tablet_schema, + const std::map<int, QueryData>& query_map) { + CHECK_EQ(output_rowset->num_segments(), 1); + // check rowset meta and file + int seg_id = 0; + // meta + const auto& index_info = output_rowset->_rowset_meta->inverted_index_file_info(seg_id); + EXPECT_TRUE(index_info.has_index_size()); + const auto& fs = output_rowset->_rowset_meta->fs(); + const auto& file_name = fmt::format("{}/{}_{}.idx", output_rowset->tablet_path(), + output_rowset->rowset_id().to_string(), seg_id); + int64_t file_size = 0; + EXPECT_TRUE(fs->file_size(file_name, &file_size).ok()); + EXPECT_EQ(index_info.index_size(), file_size); + + // file + const auto& seg_path = output_rowset->segment_path(seg_id); + EXPECT_TRUE(seg_path.has_value()); + const auto& index_file_path_prefix = + InvertedIndexDescriptor::get_index_file_path_prefix(seg_path.value()); + auto inverted_index_file_reader = std::make_shared<InvertedIndexFileReader>( + fs, std::string(index_file_path_prefix), + tablet_schema->get_inverted_index_storage_format(), index_info); + EXPECT_TRUE(inverted_index_file_reader->init().ok()); + const auto& dirs = inverted_index_file_reader->get_all_directories(); + EXPECT_TRUE(dirs.has_value()); + EXPECT_EQ(dirs.value().size(), 4); + + for (const auto& [col_uid, query_data] : query_map) { + const auto& column = tablet_schema->column_by_uid(col_uid); + const auto* index = tablet_schema->inverted_index(column); + EXPECT_TRUE(index != nullptr); + + if (col_uid == 0 || col_uid == 3) { + // BKD index + std::vector<int> query_data_int; + for (const auto& data : query_data.first) { + query_data_int.push_back(std::stoi(data)); + } + EXPECT_TRUE(query_bkd(index, inverted_index_file_reader, query_data_int, + query_data.second)); + } else if (col_uid == 1) { + // String index + EXPECT_TRUE(query_string(index, inverted_index_file_reader, std::to_string(col_uid), + query_data.first, query_data.second)); + } else if (col_uid == 2) { + // Fulltext index + EXPECT_TRUE(query_fulltext(index, inverted_index_file_reader, + std::to_string(col_uid), query_data.first, + query_data.second)); + } + } + } + + static RowsetWriterContext rowset_writer_context(const std::unique_ptr<DataDir>& data_dir, + const TabletSchemaSPtr& schema, + const std::string& tablet_path) { + RowsetWriterContext context; + RowsetId rowset_id; + rowset_id.init(inc_id); + context.rowset_id = rowset_id; + context.rowset_type = BETA_ROWSET; + context.data_dir = data_dir.get(); + context.rowset_state = VISIBLE; + context.tablet_schema = schema; + context.tablet_path = tablet_path; + context.version = Version(inc_id, inc_id); + context.max_rows_per_segment = 200; + inc_id++; + return context; + } + + static void build_rowsets(const std::unique_ptr<DataDir>& data_dir, + const TabletSchemaSPtr& schema, const TabletSharedPtr& tablet, + StorageEngine* engine_ref, std::vector<RowsetSharedPtr>& rowsets, + const std::vector<std::string>& data_files, + const std::function<void(const int32_t&)> custom_check = nullptr) { + std::vector<std::vector<DataRow>> data; + for (auto file : data_files) { + data.emplace_back(read_data(file)); + } + for (int i = 0; i < data.size(); i++) { + const auto& res = RowsetFactory::create_rowset_writer( + *engine_ref, rowset_writer_context(data_dir, schema, tablet->tablet_path()), + false); + EXPECT_TRUE(res.has_value()) << res.error(); + const auto& rowset_writer = res.value(); + + vectorized::Block block = schema->create_block(); + auto columns = block.mutate_columns(); + for (const auto& row : data[i]) { + vectorized::Field key = int32_t(row.key); + vectorized::Field v1(row.word); + vectorized::Field v2(row.url); + vectorized::Field v3 = int32_t(row.num); + columns[0]->insert(key); + columns[1]->insert(v1); + columns[2]->insert(v2); + columns[3]->insert(v3); + } + EXPECT_TRUE(rowset_writer->add_block(&block).ok()); + EXPECT_TRUE(rowset_writer->flush().ok()); + const auto& dst_writer = dynamic_cast<BaseBetaRowsetWriter*>(rowset_writer.get()); + + check_idx_file_writer_closed(dst_writer, true); + + EXPECT_TRUE(rowset_writer->build(rowsets[i]).ok()); + EXPECT_TRUE(tablet->add_rowset(rowsets[i]).ok()); + EXPECT_TRUE(rowsets[i]->num_segments() == 5); + + // check rowset meta and file + for (int seg_id = 0; seg_id < rowsets[i]->num_segments(); seg_id++) { + const auto& index_info = rowsets[i]->_rowset_meta->inverted_index_file_info(seg_id); + EXPECT_TRUE(index_info.has_index_size()); + const auto& fs = rowsets[i]->_rowset_meta->fs(); + const auto& file_name = fmt::format("{}/{}_{}.idx", rowsets[i]->tablet_path(), + rowsets[i]->rowset_id().to_string(), seg_id); + int64_t file_size = 0; + EXPECT_TRUE(fs->file_size(file_name, &file_size).ok()); + EXPECT_EQ(index_info.index_size(), file_size); + + const auto& seg_path = rowsets[i]->segment_path(seg_id); + EXPECT_TRUE(seg_path.has_value()); + const auto& index_file_path_prefix = + InvertedIndexDescriptor::get_index_file_path_prefix(seg_path.value()); + auto inverted_index_file_reader = std::make_shared<InvertedIndexFileReader>( + fs, std::string(index_file_path_prefix), + schema->get_inverted_index_storage_format(), index_info); + EXPECT_TRUE(inverted_index_file_reader->init().ok()); + const auto& dirs = inverted_index_file_reader->get_all_directories(); + EXPECT_TRUE(dirs.has_value()); + if (custom_check) { + custom_check(dirs.value().size()); + } + } + } + } + + static std::shared_ptr<InvertedIndexFileReader> init_index_file_reader( + const RowsetSharedPtr& output_rowset, const std::string& seg_path, + const InvertedIndexStorageFormatPB& index_storage_format) { + const auto& index_file_path_prefix = + InvertedIndexDescriptor::get_index_file_path_prefix(seg_path); + auto inverted_index_file_reader_index = std::make_shared<InvertedIndexFileReader>( + output_rowset->_rowset_meta->fs(), std::string(index_file_path_prefix), + index_storage_format); + auto st = inverted_index_file_reader_index->init(); + EXPECT_TRUE(st.ok()) << st.to_string(); + + return inverted_index_file_reader_index; + } +}; + +} // namespace doris \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org