This is an automated email from the ASF dual-hosted git repository. airborne pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 298185d25b8 [opt](inverted index) add option for inverted index ram dir writing when compaction (#49232) 298185d25b8 is described below commit 298185d25b8f833c569bdcbf7dbb891b214ed750 Author: airborne12 <jiang...@selectdb.com> AuthorDate: Mon Mar 31 11:17:43 2025 +0800 [opt](inverted index) add option for inverted index ram dir writing when compaction (#49232) Problem Summary: This pull request introduces a new configuration option to control whether the RAM directory is used during compaction in the inverted index. It includes updates to configuration files, modifications to the `RowsetWriter` and `InvertedIndexFileWriter` classes, and new tests to verify the functionality. --- be/src/common/config.cpp | 2 + be/src/common/config.h | 2 + be/src/olap/compaction.cpp | 2 + be/src/olap/rowset/rowset_writer.h | 8 +- be/src/olap/rowset/rowset_writer_context.h | 2 + .../segment_v2/inverted_index_file_writer.cpp | 3 +- .../rowset/segment_v2/inverted_index_file_writer.h | 6 +- .../segment_v2/inverted_index_file_writer_test.cpp | 253 +++++++++++++++++++++ 8 files changed, 273 insertions(+), 5 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c5b84f0ccfe..1896a6085a7 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1110,6 +1110,8 @@ DEFINE_mBool(inverted_index_compaction_enable, "true"); DEFINE_mBool(debug_inverted_index_compaction, "false"); // index by RAM directory DEFINE_mBool(inverted_index_ram_dir_enable, "true"); +// wheather index by RAM directory when base compaction +DEFINE_mBool(inverted_index_ram_dir_enable_when_base_compaction, "true"); // use num_broadcast_buffer blocks as buffer to do broadcast DEFINE_Int32(num_broadcast_buffer, "32"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 8e4b141986b..30a0123fbe9 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1155,6 +1155,8 @@ DECLARE_mBool(inverted_index_compaction_enable); DECLARE_mBool(debug_inverted_index_compaction); // index by RAM directory DECLARE_mBool(inverted_index_ram_dir_enable); +// wheather index by RAM directory when base compaction +DECLARE_mBool(inverted_index_ram_dir_enable_when_base_compaction); // use num_broadcast_buffer blocks as buffer to do broadcast DECLARE_Int32(num_broadcast_buffer); diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index efd3a104aea..333f7d26c1c 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -1088,6 +1088,7 @@ Status CompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx) ctx.tablet_schema = _cur_tablet_schema; ctx.newest_write_timestamp = _newest_write_timestamp; ctx.write_type = DataWriteType::TYPE_COMPACTION; + ctx.compaction_type = compaction_type(); _output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, _is_vertical)); _pending_rs_guard = _engine.add_pending_rowset(ctx); return Status::OK(); @@ -1495,6 +1496,7 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx.tablet_schema = _cur_tablet_schema; ctx.newest_write_timestamp = _newest_write_timestamp; ctx.write_type = DataWriteType::TYPE_COMPACTION; + ctx.compaction_type = compaction_type(); // We presume that the data involved in cumulative compaction is sufficiently 'hot' // and should always be retained in the cache. diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index 0a0d36ea04a..e77f44a0391 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -29,6 +29,7 @@ #include "gen_cpp/olap_file.pb.h" #include "gutil/macros.h" #include "olap/column_mapping.h" +#include "olap/olap_define.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_writer_context.h" #include "olap/rowset/segment_v2/inverted_index_file_writer.h" @@ -107,10 +108,15 @@ public: } std::string segment_prefix {InvertedIndexDescriptor::get_index_file_path_prefix( _context.segment_path(segment_id))}; + // default to true, only when base compaction, we need to check the config + bool can_use_ram_dir = true; + if (_context.compaction_type == ReaderType::READER_BASE_COMPACTION) { + can_use_ram_dir = config::inverted_index_ram_dir_enable_when_base_compaction; + } *index_file_writer = std::make_unique<InvertedIndexFileWriter>( _context.fs(), segment_prefix, _context.rowset_id.to_string(), segment_id, _context.tablet_schema->get_inverted_index_storage_format(), - std::move(idx_file_v2_ptr)); + std::move(idx_file_v2_ptr), can_use_ram_dir); return Status::OK(); } diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index cb0fda83e60..a1559a14fd1 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -82,6 +82,8 @@ struct RowsetWriterContext { // store column_unique_id to do index compaction std::set<int32_t> columns_to_do_index_compaction; DataWriteType write_type = DataWriteType::TYPE_DEFAULT; + // need to figure out the sub type of compaction + ReaderType compaction_type = ReaderType::UNKNOWN; BaseTabletSPtr tablet = nullptr; std::shared_ptr<MowContext> mow_context; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp index 7446b8160d8..d154f10f74d 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp @@ -61,9 +61,8 @@ Result<std::shared_ptr<DorisFSDirectory>> InvertedIndexFileWriter::open( const TabletIndex* index_meta) { auto local_fs_index_path = InvertedIndexDescriptor::get_temporary_index_path( _tmp_dir, _rowset_id, _seg_id, index_meta->index_id(), index_meta->get_index_suffix()); - bool can_use_ram_dir = true; auto dir = std::shared_ptr<DorisFSDirectory>(DorisFSDirectoryFactory::getDirectory( - _local_fs, local_fs_index_path.c_str(), can_use_ram_dir)); + _local_fs, local_fs_index_path.c_str(), _can_use_ram_dir)); auto st = _insert_directory_into_map(index_meta->index_id(), index_meta->get_index_suffix(), dir); if (!st.ok()) { diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h index c405d77196e..9495de24493 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h @@ -57,14 +57,15 @@ public: InvertedIndexFileWriter(io::FileSystemSPtr fs, std::string index_path_prefix, std::string rowset_id, int64_t seg_id, InvertedIndexStorageFormatPB storage_format, - io::FileWriterPtr file_writer = nullptr) + io::FileWriterPtr file_writer = nullptr, bool can_use_ram_dir = true) : _fs(std::move(fs)), _index_path_prefix(std::move(index_path_prefix)), _rowset_id(std::move(rowset_id)), _seg_id(seg_id), _storage_format(storage_format), _local_fs(io::global_local_filesystem()), - _idx_v2_writer(std::move(file_writer)) { + _idx_v2_writer(std::move(file_writer)), + _can_use_ram_dir(can_use_ram_dir) { auto tmp_file_dir = ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir(); _tmp_dir = tmp_file_dir.native(); } @@ -174,6 +175,7 @@ private: // only once bool _closed = false; + bool _can_use_ram_dir = true; }; } // namespace segment_v2 } // namespace doris diff --git a/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp b/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp index 4024ef8cb6a..d1ddc2edf23 100644 --- a/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp +++ b/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp @@ -145,6 +145,61 @@ TEST_F(InvertedIndexFileWriterTest, OpenTest) { ASSERT_TRUE(writer._indices_dirs.find(key)->second.get() == dir.get()); } +TEST_F(InvertedIndexFileWriterTest, OpenWithoutRamDirTest) { + // This test verifies that when _can_use_ram_dir is set to false, + // the directory created by InvertedIndexFileWriter::open is not a RAM directory + config::inverted_index_ram_dir_enable_when_base_compaction = false; + InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, _seg_id, + InvertedIndexStorageFormatPB::V2, nullptr, false); + + int64_t index_id = 1; + std::string index_suffix = "suffix_no_ram"; + auto index_meta = create_mock_tablet_index(index_id, index_suffix); + ASSERT_NE(index_meta, nullptr); + + // Open the directory with _can_use_ram_dir = false + auto open_result = writer.open(index_meta.get()); + ASSERT_TRUE(open_result.has_value()); + auto dir = open_result.value(); + ASSERT_NE(dir, nullptr); + + // Verify the directory is a regular DorisFSDirectory and not a DorisRAMFSDirectory + // This confirms that _can_use_ram_dir = false works as expected + ASSERT_STREQ(dir->getObjectName(), "DorisFSDirectory"); + ASSERT_STRNE(dir->getObjectName(), "DorisRAMFSDirectory"); + + // Also check that the directory is properly inserted into the _indices_dirs map + auto key = std::make_pair(index_id, index_suffix); + ASSERT_TRUE(writer._indices_dirs.find(key) != writer._indices_dirs.end()); + ASSERT_TRUE(writer._indices_dirs.find(key)->second.get() == dir.get()); +} + +TEST_F(InvertedIndexFileWriterTest, OpenWithRamDirTest) { + // This test verifies the behavior when _can_use_ram_dir is set to true + // Note: In a test environment, whether a RAM directory is actually used depends on + // various factors like available memory, file size, etc. + config::inverted_index_ram_dir_enable_when_base_compaction = true; + InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, _seg_id, + InvertedIndexStorageFormatPB::V2, nullptr, true); + + int64_t index_id = 1; + std::string index_suffix = "suffix_with_ram"; + auto index_meta = create_mock_tablet_index(index_id, index_suffix); + ASSERT_NE(index_meta, nullptr); + + // Open the directory with _can_use_ram_dir = true + auto open_result = writer.open(index_meta.get()); + ASSERT_TRUE(open_result.has_value()); + auto dir = open_result.value(); + ASSERT_NE(dir, nullptr); + // Verify the directory is a regular DorisFSDirectory and not a DorisRAMFSDirectory + ASSERT_STREQ(dir->getObjectName(), "DorisRAMFSDirectory"); + ASSERT_STRNE(dir->getObjectName(), "DorisFSDirectory"); + auto key = std::make_pair(index_id, index_suffix); + ASSERT_TRUE(writer._indices_dirs.find(key) != writer._indices_dirs.end()); + ASSERT_TRUE(writer._indices_dirs.find(key)->second.get() == dir.get()); +} + TEST_F(InvertedIndexFileWriterTest, DeleteIndexTest) { InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, _seg_id, InvertedIndexStorageFormatPB::V2); @@ -1199,4 +1254,202 @@ TEST_F(InvertedIndexFileWriterTest, AddIntoSearcherCacheV1Test) { config::enable_write_index_searcher_cache = false; } +// Mock RowsetWriter class that implements the necessary methods for testing +class MockRowsetWriter : public RowsetWriter { +public: + MockRowsetWriter(const io::FileSystemSPtr& fs, const std::string& segment_path_prefix, + const RowsetId& rowset_id, TabletSchemaSPtr tablet_schema, + ReaderType compaction_type = ReaderType::READER_QUERY) + : RowsetWriter() { + _context.rowset_id = rowset_id; + _context.tablet_schema = tablet_schema; + _context.compaction_type = compaction_type; + _segment_path_prefix = segment_path_prefix; + } + + Status init(const RowsetWriterContext& rowset_writer_context) override { return Status::OK(); } + + Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer, + FileType file_type = FileType::SEGMENT_FILE) override { + std::string index_path = + InvertedIndexDescriptor::get_index_file_path_v2(_segment_path_prefix); + io::FileWriterOptions opts; + Status st = _context.fs()->create_file(index_path, &writer, &opts); + return st; + } + + // Required overrides for abstract base class but not used in our tests + Status add_rowset(RowsetSharedPtr rowset) override { return Status::OK(); } + Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) override { + return Status::OK(); + } + Status flush() override { return Status::OK(); } + Status build(RowsetSharedPtr& rowset) override { return Status::OK(); } + RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta) override { + return nullptr; + } + PUniqueId load_id() override { return PUniqueId(); } + Version version() override { return Version(); } + int64_t num_rows() const override { return 0; } + int64_t num_rows_updated() const override { return 0; } + int64_t num_rows_deleted() const override { return 0; } + int64_t num_rows_new_added() const override { return 0; } + int64_t num_rows_filtered() const override { return 0; } + RowsetId rowset_id() override { return _context.rowset_id; } + RowsetTypePB type() const override { return BETA_ROWSET; } + int32_t allocate_segment_id() override { return 0; } + std::shared_ptr<PartialUpdateInfo> get_partial_update_info() override { return nullptr; } + bool is_partial_update() override { return false; } + +private: + std::string _segment_path_prefix; +}; + +// Test case for rowset writer's create_inverted_index_file_writer with RAM directory disabled +TEST_F(InvertedIndexFileWriterTest, RowsetWriterCreateInvertedIndexFileWriterWithoutRamDir) { + // Set config flag to disable RAM directory + config::inverted_index_ram_dir_enable_when_base_compaction = false; + + // Create a valid TabletSchema for testing + TabletSchemaPB tablet_schema_pb; + tablet_schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2); + TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>(); + tablet_schema->init_from_pb(tablet_schema_pb); + + // Create a mock rowset writer with base compaction reader type + RowsetId rowset_id; + rowset_id.init(10001); + std::string segment_path_prefix = _absolute_dir + "/test_rowset"; + + MockRowsetWriter writer(_fs, segment_path_prefix, rowset_id, tablet_schema, + ReaderType::READER_BASE_COMPACTION); + + uint32_t segment_id = 1; + InvertedIndexFileWriterPtr index_file_writer; + + // Call the method to test + Status status = writer.create_inverted_index_file_writer(segment_id, &index_file_writer); + ASSERT_TRUE(status.ok()); + ASSERT_NE(index_file_writer, nullptr); + + // Test directory creation with base schema + int64_t index_id = 1; + std::string index_suffix = "suffix_no_ram"; + auto index_meta = create_mock_tablet_index(index_id, index_suffix); + ASSERT_NE(index_meta, nullptr); + + // Open the directory with _can_use_ram_dir = false (which should be the case due to our config) + auto open_result = index_file_writer->open(index_meta.get()); + ASSERT_TRUE(open_result.has_value()); + auto dir = open_result.value(); + ASSERT_NE(dir, nullptr); + + // Verify directory type (should be DorisFSDirectory, not DorisRAMFSDirectory) + ASSERT_STREQ(dir->getObjectName(), "DorisFSDirectory"); + ASSERT_STRNE(dir->getObjectName(), "DorisRAMFSDirectory"); + + // Cleanup + dir->close(); + status = index_file_writer->close(); + ASSERT_TRUE(status.ok()); +} + +// Test case for rowset writer's create_inverted_index_file_writer with RAM directory enabled +TEST_F(InvertedIndexFileWriterTest, RowsetWriterCreateInvertedIndexFileWriterWithRamDir) { + // Set config flag to enable RAM directory + config::inverted_index_ram_dir_enable_when_base_compaction = true; + + // Create a valid TabletSchema for testing + TabletSchemaPB tablet_schema_pb; + tablet_schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2); + TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>(); + tablet_schema->init_from_pb(tablet_schema_pb); + + // Create a mock rowset writer with base compaction reader type + RowsetId rowset_id; + rowset_id.init(10002); + std::string segment_path_prefix = _absolute_dir + "/test_rowset_with_ram"; + + MockRowsetWriter writer(_fs, segment_path_prefix, rowset_id, tablet_schema, + ReaderType::READER_BASE_COMPACTION); + + uint32_t segment_id = 1; + InvertedIndexFileWriterPtr index_file_writer; + + // Call the method to test + Status status = writer.create_inverted_index_file_writer(segment_id, &index_file_writer); + ASSERT_TRUE(status.ok()); + ASSERT_NE(index_file_writer, nullptr); + + // Test directory creation with base schema + int64_t index_id = 1; + std::string index_suffix = "suffix_with_ram"; + auto index_meta = create_mock_tablet_index(index_id, index_suffix); + ASSERT_NE(index_meta, nullptr); + + // Open the directory with _can_use_ram_dir = true (which should be the case due to our config) + auto open_result = index_file_writer->open(index_meta.get()); + ASSERT_TRUE(open_result.has_value()); + auto dir = open_result.value(); + ASSERT_NE(dir, nullptr); + + // Verify directory type (should be DorisRAMFSDirectory, not DorisFSDirectory) + ASSERT_STREQ(dir->getObjectName(), "DorisRAMFSDirectory"); + ASSERT_STRNE(dir->getObjectName(), "DorisFSDirectory"); + + // Cleanup + dir->close(); + status = index_file_writer->close(); + ASSERT_TRUE(status.ok()); +} + +// Test case for rowset writer's create_inverted_index_file_writer with non-base compaction (should always use RAM) +TEST_F(InvertedIndexFileWriterTest, RowsetWriterCreateInvertedIndexFileWriterNonBaseCompaction) { + // Set config flag to disable RAM directory for base compaction + config::inverted_index_ram_dir_enable_when_base_compaction = false; + + // Create a valid TabletSchema for testing + TabletSchemaPB tablet_schema_pb; + tablet_schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V2); + TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>(); + tablet_schema->init_from_pb(tablet_schema_pb); + + // Create a mock rowset writer with QUERY reader type (not base compaction) + RowsetId rowset_id; + rowset_id.init(10003); + std::string segment_path_prefix = _absolute_dir + "/test_rowset_query"; + + MockRowsetWriter writer(_fs, segment_path_prefix, rowset_id, tablet_schema, + ReaderType::READER_QUERY); // Not base compaction + + uint32_t segment_id = 1; + InvertedIndexFileWriterPtr index_file_writer; + + // Call the method to test + Status status = writer.create_inverted_index_file_writer(segment_id, &index_file_writer); + ASSERT_TRUE(status.ok()); + ASSERT_NE(index_file_writer, nullptr); + + // Test directory creation + int64_t index_id = 1; + std::string index_suffix = "suffix_query"; + auto index_meta = create_mock_tablet_index(index_id, index_suffix); + ASSERT_NE(index_meta, nullptr); + + // Open the directory - should still use RAM dir since this is not base compaction + auto open_result = index_file_writer->open(index_meta.get()); + ASSERT_TRUE(open_result.has_value()); + auto dir = open_result.value(); + ASSERT_NE(dir, nullptr); + + // Verify directory type (should be DorisRAMFSDirectory since it's not base compaction) + ASSERT_STREQ(dir->getObjectName(), "DorisRAMFSDirectory"); + ASSERT_STRNE(dir->getObjectName(), "DorisFSDirectory"); + + // Cleanup + dir->close(); + status = index_file_writer->close(); + ASSERT_TRUE(status.ok()); +} + } // namespace doris::segment_v2 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org