This is an automated email from the ASF dual-hosted git repository.

airborne pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 581faa3221e [Refactor](inverted index) refactor inverted index file 
writer for v1/v2 index write (#42328)
581faa3221e is described below

commit 581faa3221e949b1521a5a31a86799fac0557d70
Author: airborne12 <jiang...@selectdb.com>
AuthorDate: Thu Nov 14 21:54:07 2024 +0800

    [Refactor](inverted index) refactor inverted index file writer for v1/v2 
index write (#42328)
    
    ## Proposed changes
    
    The original write_v1 and write_v2 methods in the inverted index file
    writer were overly long and difficult to maintain. This PR refactors
    these methods to improve code maintainability and readability.
---
 be/src/olap/compaction.cpp                         |   6 +-
 .../segment_v2/inverted_index_compaction.cpp       |   7 -
 .../segment_v2/inverted_index_file_writer.cpp      | 521 ++++++++++++---------
 .../rowset/segment_v2/inverted_index_file_writer.h |  68 ++-
 .../rowset/segment_v2/inverted_index_writer.cpp    |   4 +-
 .../segment_v2/inverted_index_file_writer_test.cpp | 515 ++++++++++++++++++++
 6 files changed, 875 insertions(+), 246 deletions(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 9b46782f84f..2e447d593bc 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -666,9 +666,11 @@ Status Compaction::do_inverted_index_compaction() {
                         
DORIS_TRY(inverted_index_file_readers[src_segment_id]->open(index_meta));
             }
             for (int dest_segment_id = 0; dest_segment_id < dest_segment_num; 
dest_segment_id++) {
-                auto* dest_dir =
+                auto dest_dir =
                         
DORIS_TRY(inverted_index_file_writers[dest_segment_id]->open(index_meta));
-                dest_index_dirs[dest_segment_id] = dest_dir;
+                // Destination directories in dest_index_dirs do not need to 
be deconstructed,
+                // but their lifecycle must be managed by 
inverted_index_file_writers.
+                dest_index_dirs[dest_segment_id] = dest_dir.get();
             }
             auto st = compact_column(index_meta->index_id(), src_idx_dirs, 
dest_index_dirs,
                                      index_tmp_path.native(), trans_vec, 
dest_segment_num_rows);
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
index 88a8f241722..f988c46c027 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
@@ -76,13 +76,6 @@ Status compact_column(int64_t index_id,
     // when index_writer is destroyed, if closeDir is set, dir will be close
     // _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir 
will be destroyed.
     _CLDECDELETE(dir)
-    for (auto* d : dest_index_dirs) {
-        if (d != nullptr) {
-            // NOTE: DO NOT close dest dir here, because it will be closed 
when dest index writer finalize.
-            //d->close();
-            //_CLDELETE(d);
-        }
-    }
 
     // delete temporary segment_path, only when inverted_index_ram_dir_enable 
is false
     if (!config::inverted_index_ram_dir_enable) {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
index 5599faa351d..74f7398ea4a 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
@@ -19,17 +19,14 @@
 
 #include <glog/logging.h>
 
+#include <algorithm>
 #include <filesystem>
 
 #include "common/status.h"
-#include "io/fs/file_writer.h"
-#include "io/fs/local_file_system.h"
-#include "olap/rowset/segment_v2/inverted_index_cache.h"
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
 #include "olap/rowset/segment_v2/inverted_index_fs_directory.h"
 #include "olap/rowset/segment_v2/inverted_index_reader.h"
 #include "olap/tablet_schema.h"
-#include "runtime/exec_env.h"
 
 namespace doris::segment_v2 {
 
@@ -38,32 +35,11 @@ Status 
InvertedIndexFileWriter::initialize(InvertedIndexDirectoryMap& indices_di
     return Status::OK();
 }
 
-Result<DorisFSDirectory*> InvertedIndexFileWriter::open(const TabletIndex* 
index_meta) {
-    auto tmp_file_dir = 
ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
-    const auto& local_fs = io::global_local_filesystem();
-    auto local_fs_index_path = 
InvertedIndexDescriptor::get_temporary_index_path(
-            tmp_file_dir.native(), _rowset_id, _seg_id, index_meta->index_id(),
-            index_meta->get_index_suffix());
-    bool exists = false;
-    auto st = local_fs->exists(local_fs_index_path, &exists);
-    DBUG_EXECUTE_IF("InvertedIndexFileWriter::open_local_fs_exists_error",
-                    { st = Status::Error<ErrorCode::IO_ERROR>("debug point: no 
such file error"); })
-    if (!st.ok()) {
-        LOG(ERROR) << "index_path:" << local_fs_index_path << " exists error:" 
<< st;
-        return ResultError(st);
-    }
-    DBUG_EXECUTE_IF("InvertedIndexFileWriter::open_local_fs_exists_true", { 
exists = true; })
-    if (exists) {
-        LOG(ERROR) << "try to init a directory:" << local_fs_index_path << " 
already exists";
-        return ResultError(
-                Status::InternalError("InvertedIndexFileWriter::open directory 
already exists"));
-    }
-
-    bool can_use_ram_dir = true;
-    auto* dir = DorisFSDirectoryFactory::getDirectory(local_fs, 
local_fs_index_path.c_str(),
-                                                      can_use_ram_dir);
-    auto key = std::make_pair(index_meta->index_id(), 
index_meta->get_index_suffix());
-    auto [it, inserted] = _indices_dirs.emplace(key, 
std::unique_ptr<DorisFSDirectory>(dir));
+Status InvertedIndexFileWriter::_insert_directory_into_map(int64_t index_id,
+                                                           const std::string& 
index_suffix,
+                                                           
std::shared_ptr<DorisFSDirectory> dir) {
+    auto key = std::make_pair(index_id, index_suffix);
+    auto [it, inserted] = _indices_dirs.emplace(key, std::move(dir));
     if (!inserted) {
         LOG(ERROR) << "InvertedIndexFileWriter::open attempted to insert a 
duplicate key: ("
                    << key.first << ", " << key.second << ")";
@@ -71,8 +47,23 @@ Result<DorisFSDirectory*> 
InvertedIndexFileWriter::open(const TabletIndex* index
         for (const auto& entry : _indices_dirs) {
             LOG(ERROR) << "Key: (" << entry.first.first << ", " << 
entry.first.second << ")";
         }
-        return ResultError(Status::InternalError(
-                "InvertedIndexFileWriter::open attempted to insert a duplicate 
dir"));
+        return Status::InternalError(
+                "InvertedIndexFileWriter::open attempted to insert a duplicate 
dir");
+    }
+    return Status::OK();
+}
+
+Result<std::shared_ptr<DorisFSDirectory>> InvertedIndexFileWriter::open(
+        const TabletIndex* index_meta) {
+    auto local_fs_index_path = 
InvertedIndexDescriptor::get_temporary_index_path(
+            _tmp_dir, _rowset_id, _seg_id, index_meta->index_id(), 
index_meta->get_index_suffix());
+    bool can_use_ram_dir = true;
+    auto dir = 
std::shared_ptr<DorisFSDirectory>(DorisFSDirectoryFactory::getDirectory(
+            _local_fs, local_fs_index_path.c_str(), can_use_ram_dir));
+    auto st =
+            _insert_directory_into_map(index_meta->index_id(), 
index_meta->get_index_suffix(), dir);
+    if (!st.ok()) {
+        return ResultError(st);
     }
 
     return dir;
@@ -222,7 +213,7 @@ void InvertedIndexFileWriter::copyFile(const char* 
fileName, lucene::store::Dire
     int64_t chunk = bufferLength;
 
     while (remainder > 0) {
-        int64_t len = std::min(std::min(chunk, length), remainder);
+        int64_t len = std::min({chunk, length, remainder});
         input->readBytes(buffer, len);
         output->writeBytes(buffer, len);
         remainder -= len;
@@ -252,125 +243,46 @@ void InvertedIndexFileWriter::copyFile(const char* 
fileName, lucene::store::Dire
 
 Status InvertedIndexFileWriter::write_v1() {
     int64_t total_size = 0;
+    lucene::store::Directory* out_dir = nullptr;
+    std::unique_ptr<lucene::store::IndexOutput> output = nullptr;
     for (const auto& entry : _indices_dirs) {
         const int64_t index_id = entry.first.first;
         const auto& index_suffix = entry.first.second;
         try {
-            const auto& directory = entry.second;
-            std::vector<std::string> files;
-            directory->list(&files);
-            // remove write.lock file
-            auto it = std::find(files.begin(), files.end(), 
DorisFSDirectory::WRITE_LOCK_FILE);
-            if (it != files.end()) {
-                files.erase(it);
-            }
+            const auto& directory = entry.second.get();
 
-            std::vector<FileInfo> sorted_files;
-            for (auto file : files) {
-                FileInfo file_info;
-                file_info.filename = file;
-                file_info.filesize = directory->fileLength(file.c_str());
-                sorted_files.emplace_back(std::move(file_info));
-            }
-            sort_files(sorted_files);
-
-            int32_t file_count = sorted_files.size();
-
-            io::Path cfs_path(InvertedIndexDescriptor::get_index_file_path_v1(
-                    _index_path_prefix, index_id, index_suffix));
-            auto idx_path = cfs_path.parent_path();
-            std::string idx_name = cfs_path.filename();
-            // write file entries to ram directory to get header length
-            lucene::store::RAMDirectory ram_dir;
-            auto* out_idx = ram_dir.createOutput(idx_name.c_str());
-            
DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_ram_output_is_nullptr",
-                            { out_idx = nullptr; })
-            if (out_idx == nullptr) {
-                LOG(WARNING) << "Write compound file error: RAMDirectory 
output is nullptr.";
-                _CLTHROWA(CL_ERR_IO, "Create RAMDirectory output error");
-            }
+            // Prepare sorted file list
+            auto sorted_files = prepare_sorted_files(directory);
+
+            // Calculate header length
+            auto [header_length, header_file_count] =
+                    calculate_header_length(sorted_files, directory);
+
+            // Create output stream
+            auto result = create_output_stream_v1(index_id, index_suffix);
+            out_dir = result.first;
+            output = std::move(result.second);
 
-            std::unique_ptr<lucene::store::IndexOutput> ram_output(out_idx);
-            ram_output->writeVInt(file_count);
-            // write file entries in ram directory
-            // number of files, which data are in header
-            int header_file_count = 0;
-            int64_t header_file_length = 0;
-            const int64_t buffer_length = 16384;
-            uint8_t ram_buffer[buffer_length];
-            for (auto file : sorted_files) {
-                ram_output->writeString(file.filename); // file name
-                ram_output->writeLong(0);               // data offset
-                ram_output->writeLong(file.filesize);   // file length
-                header_file_length += file.filesize;
-                if (header_file_length <= 
DorisFSDirectory::MAX_HEADER_DATA_SIZE) {
-                    copyFile(file.filename.c_str(), directory.get(), 
ram_output.get(), ram_buffer,
-                             buffer_length);
-                    header_file_count++;
-                }
-            }
-            auto header_len = ram_output->getFilePointer();
-            ram_output->close();
-            ram_dir.deleteFile(idx_name.c_str());
-            ram_dir.close();
-
-            auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, 
idx_path.c_str());
-            out_dir->set_file_writer_opts(_opts);
-
-            auto* out = out_dir->createOutput(idx_name.c_str());
-            
DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_out_dir_createOutput_nullptr",
-                            { out = nullptr; });
-            if (out == nullptr) {
-                LOG(WARNING) << "Write compound file error: CompoundDirectory 
output is nullptr.";
-                _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
-            }
-            std::unique_ptr<lucene::store::IndexOutput> output(out);
             size_t start = output->getFilePointer();
-            output->writeVInt(file_count);
-            // write file entries
-            int64_t data_offset = header_len;
-            uint8_t header_buffer[buffer_length];
-            for (int i = 0; i < sorted_files.size(); ++i) {
-                auto file = sorted_files[i];
-                output->writeString(file.filename); // FileName
-                // DataOffset
-                if (i < header_file_count) {
-                    // file data write in header, so we set its offset to -1.
-                    output->writeLong(-1);
-                } else {
-                    output->writeLong(data_offset);
-                }
-                output->writeLong(file.filesize); // FileLength
-                if (i < header_file_count) {
-                    // append data
-                    copyFile(file.filename.c_str(), directory.get(), 
output.get(), header_buffer,
-                             buffer_length);
-                } else {
-                    data_offset += file.filesize;
-                }
-            }
-            // write rest files' data
-            uint8_t data_buffer[buffer_length];
-            for (int i = header_file_count; i < sorted_files.size(); ++i) {
-                auto file = sorted_files[i];
-                copyFile(file.filename.c_str(), directory.get(), output.get(), 
data_buffer,
-                         buffer_length);
-            }
-            out_dir->close();
-            // NOTE: need to decrease ref count, but not to delete here,
-            // because index cache may get the same directory from DIRECTORIES
-            _CLDECDELETE(out_dir)
+            // Write header and data
+            write_header_and_data_v1(output.get(), sorted_files, directory, 
header_length,
+                                     header_file_count);
+
+            // Close and clean up
+            finalize_output_dir(out_dir);
+
+            // Collect file information
             auto compound_file_size = output->getFilePointer() - start;
             output->close();
-            //LOG(INFO) << (idx_path / idx_name).c_str() << " size:" << 
compound_file_size;
             total_size += compound_file_size;
-            InvertedIndexFileInfo_IndexInfo index_info;
-            index_info.set_index_id(index_id);
-            index_info.set_index_suffix(index_suffix);
-            index_info.set_index_file_size(compound_file_size);
-            auto* new_index_info = _file_info.add_index_info();
-            *new_index_info = index_info;
+            add_index_info(index_id, index_suffix, compound_file_size);
+
         } catch (CLuceneError& err) {
+            finalize_output_dir(out_dir);
+            if (output != nullptr) {
+                output->close();
+                output.reset();
+            }
             auto index_path = InvertedIndexDescriptor::get_index_file_path_v1(
                     _index_path_prefix, index_id, index_suffix);
             LOG(ERROR) << "CLuceneError occur when write_v1 idx file " << 
index_path
@@ -386,110 +298,267 @@ Status InvertedIndexFileWriter::write_v1() {
 }
 
 Status InvertedIndexFileWriter::write_v2() {
-    io::Path index_path 
{InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)};
-    std::unique_ptr<lucene::store::IndexOutput> compound_file_output;
+    lucene::store::Directory* out_dir = nullptr;
+    std::unique_ptr<lucene::store::IndexOutput> compound_file_output = nullptr;
     try {
-        // Create the output stream to write the compound file
+        // Calculate header length and initialize offset
         int64_t current_offset = headerLength();
+        // Prepare file metadata
+        auto file_metadata = prepare_file_metadata_v2(current_offset);
+
+        // Create output stream
+        auto result = create_output_stream_v2();
+        out_dir = result.first;
+        compound_file_output = std::move(result.second);
+
+        // Write version and number of indices
+        write_version_and_indices_count(compound_file_output.get());
+
+        // Write index headers and file metadata
+        write_index_headers_and_metadata(compound_file_output.get(), 
file_metadata);
+
+        // Copy file data
+        copy_files_data_v2(compound_file_output.get(), file_metadata);
 
+        // Close and clean up
+        finalize_output_dir(out_dir);
+        _total_file_size = compound_file_output->getFilePointer();
+        _file_info.set_index_size(_total_file_size);
+        compound_file_output->close();
+        return Status::OK();
+    } catch (CLuceneError& err) {
         io::Path index_path 
{InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)};
+        LOG(ERROR) << "CLuceneError occur when close idx file " << index_path
+                   << " error msg: " << err.what();
+        if (compound_file_output != nullptr) {
+            compound_file_output->close();
+            compound_file_output.reset();
+        }
+        finalize_output_dir(out_dir);
+        return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
+                "CLuceneError occur when close idx file: {}, error msg: {}", 
index_path.c_str(),
+                err.what());
+    }
+}
 
-        auto* out_dir =
-                DorisFSDirectoryFactory::getDirectory(_fs, 
index_path.parent_path().c_str());
-        out_dir->set_file_writer_opts(_opts);
+// Helper function implementations
 
-        std::unique_ptr<lucene::store::IndexOutput> compound_file_output;
+std::vector<FileInfo> InvertedIndexFileWriter::prepare_sorted_files(
+        lucene::store::Directory* directory) {
+    std::vector<std::string> files;
+    directory->list(&files);
 
-        DCHECK(_idx_v2_writer != nullptr) << "inverted index file writer v2 is 
nullptr";
-        compound_file_output = std::unique_ptr<lucene::store::IndexOutput>(
-                out_dir->createOutputV2(_idx_v2_writer.get()));
+    // Remove write.lock file
+    files.erase(std::remove(files.begin(), files.end(), 
DorisFSDirectory::WRITE_LOCK_FILE),
+                files.end());
 
-        // Write the version number
-        compound_file_output->writeInt(InvertedIndexStorageFormatPB::V2);
+    std::vector<FileInfo> sorted_files;
+    for (const auto& file : files) {
+        FileInfo file_info;
+        file_info.filename = file;
+        file_info.filesize = directory->fileLength(file.c_str());
+        sorted_files.push_back(std::move(file_info));
+    }
 
-        // Write the number of indices
-        const auto numIndices = static_cast<uint32_t>(_indices_dirs.size());
-        compound_file_output->writeInt(numIndices);
+    // Sort the files
+    sort_files(sorted_files);
+    return sorted_files;
+}
 
-        std::vector<std::tuple<std::string, int64_t, int64_t, 
CL_NS(store)::Directory*>>
-                file_metadata; // Store file name, offset, file length, and 
corresponding directory
+void InvertedIndexFileWriter::finalize_output_dir(lucene::store::Directory* 
out_dir) {
+    if (out_dir != nullptr) {
+        out_dir->close();
+        _CLDECDELETE(out_dir)
+    }
+}
 
-        // First, write all index information and file metadata
-        for (const auto& entry : _indices_dirs) {
-            const int64_t index_id = entry.first.first;
-            const auto& index_suffix = entry.first.second;
-            const auto& dir = entry.second;
-            std::vector<std::string> files;
-            dir->list(&files);
-
-            auto it = std::find(files.begin(), files.end(), 
DorisFSDirectory::WRITE_LOCK_FILE);
-            if (it != files.end()) {
-                files.erase(it);
-            }
-            // sort file list by file length
-            std::vector<std::pair<std::string, int64_t>> sorted_files;
-            for (const auto& file : files) {
-                sorted_files.emplace_back(file, dir->fileLength(file.c_str()));
-            }
+void InvertedIndexFileWriter::add_index_info(int64_t index_id, const 
std::string& index_suffix,
+                                             int64_t compound_file_size) {
+    InvertedIndexFileInfo_IndexInfo index_info;
+    index_info.set_index_id(index_id);
+    index_info.set_index_suffix(index_suffix);
+    index_info.set_index_file_size(compound_file_size);
+    auto* new_index_info = _file_info.add_index_info();
+    *new_index_info = index_info;
+}
 
-            std::sort(
-                    sorted_files.begin(), sorted_files.end(),
-                    [](const std::pair<std::string, int64_t>& a,
-                       const std::pair<std::string, int64_t>& b) { return 
(a.second < b.second); });
-
-            int32_t file_count = sorted_files.size();
-
-            // Write the index ID and the number of files
-            compound_file_output->writeLong(index_id);
-            
compound_file_output->writeInt(static_cast<int32_t>(index_suffix.length()));
-            compound_file_output->writeBytes(reinterpret_cast<const 
uint8_t*>(index_suffix.data()),
-                                             index_suffix.length());
-            compound_file_output->writeInt(file_count);
-
-            // Calculate the offset for each file and write the file metadata
-            for (const auto& file : sorted_files) {
-                int64_t file_length = dir->fileLength(file.first.c_str());
-                
compound_file_output->writeInt(static_cast<int32_t>(file.first.length()));
-                compound_file_output->writeBytes(
-                        reinterpret_cast<const uint8_t*>(file.first.data()), 
file.first.length());
-                compound_file_output->writeLong(current_offset);
-                compound_file_output->writeLong(file_length);
-
-                file_metadata.emplace_back(file.first, current_offset, 
file_length, dir.get());
-                current_offset += file_length; // Update the data offset
-            }
+std::pair<int64_t, int32_t> InvertedIndexFileWriter::calculate_header_length(
+        const std::vector<FileInfo>& sorted_files, lucene::store::Directory* 
directory) {
+    // Use RAMDirectory to calculate header length
+    lucene::store::RAMDirectory ram_dir;
+    auto* out_idx = ram_dir.createOutput("temp_idx");
+    
DBUG_EXECUTE_IF("InvertedIndexFileWriter::calculate_header_length_ram_output_is_nullptr",
+                    { out_idx = nullptr; })
+    if (out_idx == nullptr) {
+        LOG(WARNING) << "InvertedIndexFileWriter::calculate_header_length 
error: RAMDirectory "
+                        "output is nullptr.";
+        _CLTHROWA(CL_ERR_IO, "Create RAMDirectory output error");
+    }
+    std::unique_ptr<lucene::store::IndexOutput> ram_output(out_idx);
+    int32_t file_count = sorted_files.size();
+    ram_output->writeVInt(file_count);
+
+    int64_t header_file_length = 0;
+    const int64_t buffer_length = 16384;
+    uint8_t ram_buffer[buffer_length];
+    int32_t header_file_count = 0;
+    for (const auto& file : sorted_files) {
+        ram_output->writeString(file.filename);
+        ram_output->writeLong(0);
+        ram_output->writeLong(file.filesize);
+        header_file_length += file.filesize;
+
+        if (header_file_length <= DorisFSDirectory::MAX_HEADER_DATA_SIZE) {
+            copyFile(file.filename.c_str(), directory, ram_output.get(), 
ram_buffer, buffer_length);
+            header_file_count++;
         }
+    }
 
-        const int64_t buffer_length = 16384;
-        uint8_t header_buffer[buffer_length];
+    int64_t header_length = ram_output->getFilePointer();
+    ram_output->close();
+    ram_dir.close();
+    return {header_length, header_file_count};
+}
+
+std::pair<lucene::store::Directory*, 
std::unique_ptr<lucene::store::IndexOutput>>
+InvertedIndexFileWriter::create_output_stream_v1(int64_t index_id,
+                                                 const std::string& 
index_suffix) {
+    io::Path 
cfs_path(InvertedIndexDescriptor::get_index_file_path_v1(_index_path_prefix, 
index_id,
+                                                                      
index_suffix));
+    auto idx_path = cfs_path.parent_path();
+    std::string idx_name = cfs_path.filename();
+
+    auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, 
idx_path.c_str());
+    out_dir->set_file_writer_opts(_opts);
+
+    auto* out = out_dir->createOutput(idx_name.c_str());
+    
DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_out_dir_createOutput_nullptr",
+                    { out = nullptr; });
+    if (out == nullptr) {
+        LOG(WARNING) << "InvertedIndexFileWriter::create_output_stream_v1 
error: CompoundDirectory "
+                        "output is nullptr.";
+        _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
+    }
 
-        // Next, write the file data
-        for (const auto& info : file_metadata) {
-            const std::string& file = std::get<0>(info);
-            auto* dir = std::get<3>(info);
+    std::unique_ptr<lucene::store::IndexOutput> output(out);
+    return {out_dir, std::move(output)};
+}
 
-            // Write the actual file data
-            copyFile(file.c_str(), dir, compound_file_output.get(), 
header_buffer, buffer_length);
+void 
InvertedIndexFileWriter::write_header_and_data_v1(lucene::store::IndexOutput* 
output,
+                                                       const 
std::vector<FileInfo>& sorted_files,
+                                                       
lucene::store::Directory* directory,
+                                                       int64_t header_length,
+                                                       int32_t 
header_file_count) {
+    output->writeVInt(sorted_files.size());
+    int64_t data_offset = header_length;
+    const int64_t buffer_length = 16384;
+    uint8_t buffer[buffer_length];
+
+    for (int i = 0; i < sorted_files.size(); ++i) {
+        auto file = sorted_files[i];
+        output->writeString(file.filename);
+
+        // DataOffset
+        if (i < header_file_count) {
+            // file data write in header, so we set its offset to -1.
+            output->writeLong(-1);
+        } else {
+            output->writeLong(data_offset);
         }
+        output->writeLong(file.filesize); // FileLength
+        if (i < header_file_count) {
+            // append data
+            copyFile(file.filename.c_str(), directory, output, buffer, 
buffer_length);
+        } else {
+            data_offset += file.filesize;
+        }
+    }
 
-        out_dir->close();
-        // NOTE: need to decrease ref count, but not to delete here,
-        // because index cache may get the same directory from DIRECTORIES
-        _CLDECDELETE(out_dir)
-        _total_file_size = compound_file_output->getFilePointer();
-        compound_file_output->close();
-        _file_info.set_index_size(_total_file_size);
-    } catch (CLuceneError& err) {
-        LOG(ERROR) << "CLuceneError occur when close idx file " << index_path
-                   << " error msg: " << err.what();
-        if (compound_file_output) {
-            compound_file_output->close();
-            compound_file_output.reset();
+    for (size_t i = header_file_count; i < sorted_files.size(); ++i) {
+        copyFile(sorted_files[i].filename.c_str(), directory, output, buffer, 
buffer_length);
+    }
+}
+
+std::pair<lucene::store::Directory*, 
std::unique_ptr<lucene::store::IndexOutput>>
+InvertedIndexFileWriter::create_output_stream_v2() {
+    io::Path index_path 
{InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)};
+    auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, 
index_path.parent_path().c_str());
+    out_dir->set_file_writer_opts(_opts);
+    DCHECK(_idx_v2_writer != nullptr) << "inverted index file writer v2 is 
nullptr";
+    auto compound_file_output = std::unique_ptr<lucene::store::IndexOutput>(
+            out_dir->createOutputV2(_idx_v2_writer.get()));
+    return std::make_pair(out_dir, std::move(compound_file_output));
+}
+
+void 
InvertedIndexFileWriter::write_version_and_indices_count(lucene::store::IndexOutput*
 output) {
+    // Write the version number
+    output->writeInt(InvertedIndexStorageFormatPB::V2);
+
+    // Write the number of indices
+    const auto num_indices = static_cast<uint32_t>(_indices_dirs.size());
+    output->writeInt(num_indices);
+}
+
+std::vector<InvertedIndexFileWriter::FileMetadata>
+InvertedIndexFileWriter::prepare_file_metadata_v2(int64_t& current_offset) {
+    std::vector<FileMetadata> file_metadata;
+
+    for (const auto& entry : _indices_dirs) {
+        const int64_t index_id = entry.first.first;
+        const auto& index_suffix = entry.first.second;
+        auto* dir = entry.second.get();
+
+        // Get sorted files
+        auto sorted_files = prepare_sorted_files(dir);
+
+        for (const auto& file : sorted_files) {
+            file_metadata.emplace_back(index_id, index_suffix, file.filename, 
current_offset,
+                                       file.filesize, dir);
+            current_offset += file.filesize; // Update the data offset
         }
-        return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
-                "CLuceneError occur when close idx file: {}, error msg: {}", 
index_path.c_str(),
-                err.what());
     }
-    return Status::OK();
+    return file_metadata;
+}
+
+void InvertedIndexFileWriter::write_index_headers_and_metadata(
+        lucene::store::IndexOutput* output, const std::vector<FileMetadata>& 
file_metadata) {
+    // Group files by index_id and index_suffix
+    std::map<std::pair<int64_t, std::string>, std::vector<FileMetadata>> 
indices;
+
+    for (const auto& meta : file_metadata) {
+        indices[{meta.index_id, meta.index_suffix}].push_back(meta);
+    }
+
+    for (const auto& index_entry : indices) {
+        int64_t index_id = index_entry.first.first;
+        const std::string& index_suffix = index_entry.first.second;
+        const auto& files = index_entry.second;
+
+        // Write the index ID and the number of files
+        output->writeLong(index_id);
+        output->writeInt(static_cast<int32_t>(index_suffix.length()));
+        output->writeBytes(reinterpret_cast<const 
uint8_t*>(index_suffix.data()),
+                           index_suffix.length());
+        output->writeInt(static_cast<int32_t>(files.size()));
+
+        // Write file metadata
+        for (const auto& file : files) {
+            output->writeInt(static_cast<int32_t>(file.filename.length()));
+            output->writeBytes(reinterpret_cast<const 
uint8_t*>(file.filename.data()),
+                               file.filename.length());
+            output->writeLong(file.offset);
+            output->writeLong(file.length);
+        }
+    }
+}
+
+void InvertedIndexFileWriter::copy_files_data_v2(lucene::store::IndexOutput* 
output,
+                                                 const 
std::vector<FileMetadata>& file_metadata) {
+    const int64_t buffer_length = 16384;
+    uint8_t buffer[buffer_length];
+
+    for (const auto& meta : file_metadata) {
+        copyFile(meta.filename.c_str(), meta.directory, output, buffer, 
buffer_length);
+    }
 }
 } // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h 
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
index 31e287d6dd3..3a2fcc1e6ac 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
@@ -28,7 +28,9 @@
 
 #include "io/fs/file_system.h"
 #include "io/fs/file_writer.h"
+#include "io/fs/local_file_system.h"
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
+#include "runtime/exec_env.h"
 
 namespace doris {
 class TabletIndex;
@@ -36,7 +38,7 @@ class TabletIndex;
 namespace segment_v2 {
 class DorisFSDirectory;
 using InvertedIndexDirectoryMap =
-        std::map<std::pair<int64_t, std::string>, 
std::unique_ptr<lucene::store::Directory>>;
+        std::map<std::pair<int64_t, std::string>, 
std::shared_ptr<lucene::store::Directory>>;
 
 class InvertedIndexFileWriter;
 using InvertedIndexFileWriterPtr = std::unique_ptr<InvertedIndexFileWriter>;
@@ -58,16 +60,19 @@ public:
               _rowset_id(std::move(rowset_id)),
               _seg_id(seg_id),
               _storage_format(storage_format),
-              _idx_v2_writer(std::move(file_writer)) {}
+              _local_fs(io::global_local_filesystem()),
+              _idx_v2_writer(std::move(file_writer)) {
+        auto tmp_file_dir = 
ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir();
+        _tmp_dir = tmp_file_dir.native();
+    }
 
-    Result<DorisFSDirectory*> open(const TabletIndex* index_meta);
+    Result<std::shared_ptr<DorisFSDirectory>> open(const TabletIndex* 
index_meta);
     Status delete_index(const TabletIndex* index_meta);
     Status initialize(InvertedIndexDirectoryMap& indices_dirs);
-    ~InvertedIndexFileWriter() = default;
+    virtual ~InvertedIndexFileWriter() = default;
     Status write_v2();
     Status write_v1();
     Status close();
-    int64_t headerLength();
     const InvertedIndexFileInfo* get_index_file_info() const {
         DCHECK(_closed) << debug_string();
         return &_file_info;
@@ -77,11 +82,7 @@ public:
         return _total_file_size;
     }
     const io::FileSystemSPtr& get_fs() const { return _fs; }
-    void sort_files(std::vector<FileInfo>& file_infos);
-    void copyFile(const char* fileName, lucene::store::Directory* dir,
-                  lucene::store::IndexOutput* output, uint8_t* buffer, int64_t 
bufferLength);
     InvertedIndexStorageFormatPB get_storage_format() const { return 
_storage_format; }
-
     void set_file_writer_opts(const io::FileWriterOptions& opts) { _opts = 
opts; }
 
     std::string debug_string() const {
@@ -99,12 +100,61 @@ public:
     }
 
 private:
+    // Helper functions shared between write_v1 and write_v2
+    std::vector<FileInfo> prepare_sorted_files(lucene::store::Directory* 
directory);
+    void sort_files(std::vector<FileInfo>& file_infos);
+    void copyFile(const char* fileName, lucene::store::Directory* dir,
+                  lucene::store::IndexOutput* output, uint8_t* buffer, int64_t 
bufferLength);
+    void finalize_output_dir(lucene::store::Directory* out_dir);
+    void add_index_info(int64_t index_id, const std::string& index_suffix,
+                        int64_t compound_file_size);
+    int64_t headerLength();
+    // Helper functions specific to write_v1
+    std::pair<int64_t, int32_t> calculate_header_length(const 
std::vector<FileInfo>& sorted_files,
+                                                        
lucene::store::Directory* directory);
+    std::pair<lucene::store::Directory*, 
std::unique_ptr<lucene::store::IndexOutput>>
+    create_output_stream_v1(int64_t index_id, const std::string& index_suffix);
+    virtual void write_header_and_data_v1(lucene::store::IndexOutput* output,
+                                          const std::vector<FileInfo>& 
sorted_files,
+                                          lucene::store::Directory* directory,
+                                          int64_t header_length, int32_t 
header_file_count);
+    // Helper functions specific to write_v2
+    std::pair<lucene::store::Directory*, 
std::unique_ptr<lucene::store::IndexOutput>>
+    create_output_stream_v2();
+    void write_version_and_indices_count(lucene::store::IndexOutput* output);
+    struct FileMetadata {
+        int64_t index_id;
+        std::string index_suffix;
+        std::string filename;
+        int64_t offset;
+        int64_t length;
+        lucene::store::Directory* directory;
+
+        FileMetadata(int64_t id, const std::string& suffix, const std::string& 
file, int64_t off,
+                     int64_t len, lucene::store::Directory* dir)
+                : index_id(id),
+                  index_suffix(suffix),
+                  filename(file),
+                  offset(off),
+                  length(len),
+                  directory(dir) {}
+    };
+    std::vector<FileMetadata> prepare_file_metadata_v2(int64_t& 
current_offset);
+    virtual void write_index_headers_and_metadata(lucene::store::IndexOutput* 
output,
+                                                  const 
std::vector<FileMetadata>& file_metadata);
+    void copy_files_data_v2(lucene::store::IndexOutput* output,
+                            const std::vector<FileMetadata>& file_metadata);
+    Status _insert_directory_into_map(int64_t index_id, const std::string& 
index_suffix,
+                                      std::shared_ptr<DorisFSDirectory> dir);
+    // Member variables...
     InvertedIndexDirectoryMap _indices_dirs;
     const io::FileSystemSPtr _fs;
     std::string _index_path_prefix;
     std::string _rowset_id;
     int64_t _seg_id;
     InvertedIndexStorageFormatPB _storage_format;
+    std::string _tmp_dir;
+    const std::shared_ptr<io::LocalFileSystem>& _local_fs;
 
     // write to disk or stream
     io::FileWriterPtr _idx_v2_writer = nullptr;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
index 29fe4609e59..a4f3ca55dd1 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
@@ -197,7 +197,7 @@ public:
         bool create_index = true;
         bool close_dir_on_shutdown = true;
         auto index_writer = std::make_unique<lucene::index::IndexWriter>(
-                _dir, _analyzer.get(), create_index, close_dir_on_shutdown);
+                _dir.get(), _analyzer.get(), create_index, 
close_dir_on_shutdown);
         
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_index_writer_setRAMBufferSizeMB_error",
                         { index_writer->setRAMBufferSizeMB(-100); })
         
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_index_writer_setMaxBufferedDocs_error",
@@ -708,7 +708,7 @@ private:
     std::unique_ptr<lucene::util::Reader> _char_string_reader = nullptr;
     std::shared_ptr<lucene::util::bkd::bkd_writer> _bkd_writer = nullptr;
     InvertedIndexCtxSPtr _inverted_index_ctx = nullptr;
-    DorisFSDirectory* _dir = nullptr;
+    std::shared_ptr<DorisFSDirectory> _dir = nullptr;
     const KeyCoder* _value_key_coder;
     const TabletIndex* _index_meta;
     InvertedIndexParserType _parser_type;
diff --git a/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp 
b/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp
new file mode 100644
index 00000000000..dd3b4195c14
--- /dev/null
+++ b/be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp
@@ -0,0 +1,515 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/segment_v2/inverted_index_file_writer.h"
+
+#include <gmock/gmock.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/segment_v2/inverted_index_fs_directory.h"
+#include "olap/storage_engine.h"
+
+namespace doris {
+namespace segment_v2 {
+
+using namespace doris::vectorized;
+
+class InvertedIndexFileWriterTest : public ::testing::Test {
+protected:
+    class MockDorisFSDirectoryFileLength : public DorisFSDirectory {
+    public:
+        //MOCK_METHOD(lucene::store::IndexOutput*, createOutput, (const char* 
name), (override));
+        MOCK_METHOD(int64_t, fileLength, (const char* name), (const, 
override));
+        //MOCK_METHOD(void, close, (), (override));
+        //MOCK_METHOD(const char*, getObjectName, (), (const, override));
+    };
+    class MockDorisFSDirectoryOpenInput : public DorisFSDirectory {
+    public:
+        MOCK_METHOD(bool, openInput,
+                    (const char* name, lucene::store::IndexInput*& ret, 
CLuceneError& err,
+                     int32_t bufferSize),
+                    (override));
+    };
+    void SetUp() override {
+        char buffer[MAX_PATH_LEN];
+        ASSERT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
+        _current_dir = std::string(buffer);
+        _absolute_dir = _current_dir + "/" + std::string(dest_dir);
+        
ASSERT_TRUE(io::global_local_filesystem()->delete_directory(_absolute_dir).ok());
+        
ASSERT_TRUE(io::global_local_filesystem()->create_directory(_absolute_dir).ok());
+        // tmp dir
+        
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(tmp_dir).ok());
+        
EXPECT_TRUE(io::global_local_filesystem()->create_directory(tmp_dir).ok());
+        std::vector<StorePath> paths;
+        paths.emplace_back(std::string(tmp_dir), -1);
+        auto tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(paths);
+        EXPECT_TRUE(tmp_file_dirs->init().ok());
+        ExecEnv::GetInstance()->set_tmp_file_dir(std::move(tmp_file_dirs));
+
+        // use memory limit
+        int64_t inverted_index_cache_limit = 0;
+        _inverted_index_searcher_cache = 
std::unique_ptr<segment_v2::InvertedIndexSearcherCache>(
+                
InvertedIndexSearcherCache::create_global_instance(inverted_index_cache_limit,
+                                                                   256));
+
+        ExecEnv::GetInstance()->set_inverted_index_searcher_cache(
+                _inverted_index_searcher_cache.get());
+        doris::EngineOptions options;
+        auto engine = std::make_unique<StorageEngine>(options);
+        _engine_ref = engine.get();
+        _data_dir = std::make_unique<DataDir>(*_engine_ref, _absolute_dir);
+        ASSERT_TRUE(_data_dir->update_capacity().ok());
+        ExecEnv::GetInstance()->set_storage_engine(std::move(engine));
+
+        _fs = io::global_local_filesystem();
+        _index_path_prefix = _absolute_dir + "/index_test";
+        _rowset_id = "test_rowset";
+        _seg_id = 1;
+    }
+
+    void TearDown() override {
+        
ASSERT_TRUE(io::global_local_filesystem()->delete_directory(_absolute_dir).ok());
+        ExecEnv::GetInstance()->set_storage_engine(nullptr);
+    }
+
+    std::unique_ptr<TabletIndex> create_mock_tablet_index(int64_t index_id,
+                                                          const std::string& 
index_suffix) {
+        TabletIndexPB index_pb;
+        index_pb.set_index_id(index_id);
+        index_pb.set_index_suffix_name(index_suffix);
+        index_pb.set_index_type(IndexType::INVERTED);
+        auto index = std::make_unique<TabletIndex>();
+        index->init_from_pb(index_pb);
+        return index;
+    }
+
+    std::string _current_dir;
+    std::string _absolute_dir;
+    io::FileSystemSPtr _fs;
+    std::string _index_path_prefix;
+    std::string _rowset_id;
+    int64_t _seg_id;
+    StorageEngine* _engine_ref = nullptr;
+    std::unique_ptr<DataDir> _data_dir = nullptr;
+    std::unique_ptr<InvertedIndexSearcherCache> _inverted_index_searcher_cache;
+
+    constexpr static uint32_t MAX_PATH_LEN = 1024;
+    constexpr static std::string_view dest_dir = 
"./ut_dir/inverted_index_file_writer_test";
+    constexpr static std::string_view tmp_dir = "./ut_dir/tmp";
+};
+
+TEST_F(InvertedIndexFileWriterTest, InitializeTest) {
+    InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, 
_seg_id,
+                                   InvertedIndexStorageFormatPB::V2);
+
+    InvertedIndexDirectoryMap indices_dirs;
+    indices_dirs.emplace(std::make_pair(1, "suffix1"), 
std::make_unique<DorisFSDirectory>());
+    indices_dirs.emplace(std::make_pair(2, "suffix2"), 
std::make_unique<DorisFSDirectory>());
+
+    Status status = writer.initialize(indices_dirs);
+    ASSERT_TRUE(status.ok());
+
+    ASSERT_EQ(writer.get_storage_format(), InvertedIndexStorageFormatPB::V2);
+}
+
+TEST_F(InvertedIndexFileWriterTest, OpenTest) {
+    InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, 
_seg_id,
+                                   InvertedIndexStorageFormatPB::V2);
+
+    int64_t index_id = 1;
+    std::string index_suffix = "suffix1";
+    auto index_meta = create_mock_tablet_index(index_id, index_suffix);
+    ASSERT_NE(index_meta, nullptr);
+
+    auto open_result = writer.open(index_meta.get());
+    ASSERT_TRUE(open_result.has_value());
+    auto dir = open_result.value();
+    ASSERT_NE(dir, nullptr);
+
+    auto key = std::make_pair(index_id, index_suffix);
+    ASSERT_TRUE(writer._indices_dirs.find(key) != writer._indices_dirs.end());
+    ASSERT_TRUE(writer._indices_dirs.find(key)->second.get() == dir.get());
+}
+
+TEST_F(InvertedIndexFileWriterTest, DeleteIndexTest) {
+    InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, 
_seg_id,
+                                   InvertedIndexStorageFormatPB::V2);
+
+    InvertedIndexDirectoryMap indices_dirs;
+    int64_t index_id = 1;
+    std::string index_suffix = "suffix1";
+    auto st = writer._insert_directory_into_map(index_id, index_suffix,
+                                                
std::make_shared<DorisFSDirectory>());
+    if (!st.ok()) {
+        std::cerr << "_insert_directory_into_map error in DeleteIndexTest: " 
<< st.msg()
+                  << std::endl;
+        ASSERT_TRUE(false);
+        return;
+    }
+    auto key = std::make_pair(index_id, index_suffix);
+    ASSERT_TRUE(writer._indices_dirs.find(key) != writer._indices_dirs.end());
+
+    auto index_meta = create_mock_tablet_index(index_id, index_suffix);
+    ASSERT_NE(index_meta, nullptr);
+    Status del_status = writer.delete_index(index_meta.get());
+    ASSERT_TRUE(del_status.ok());
+    ASSERT_TRUE(writer._indices_dirs.find(key) == writer._indices_dirs.end());
+
+    Status del_nonexist_status = writer.delete_index(index_meta.get());
+    ASSERT_TRUE(del_nonexist_status.ok());
+}
+
+TEST_F(InvertedIndexFileWriterTest, WriteV1Test) {
+    InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, 
_seg_id,
+                                   InvertedIndexStorageFormatPB::V1);
+
+    int64_t index_id = 1;
+    std::string index_suffix = "suffix1";
+    auto index_meta = create_mock_tablet_index(index_id, index_suffix);
+    ASSERT_NE(index_meta, nullptr);
+
+    auto open_result = writer.open(index_meta.get());
+    ASSERT_TRUE(open_result.has_value());
+    auto dir = open_result.value();
+    auto out_file = 
std::unique_ptr<lucene::store::IndexOutput>(dir->createOutput("write_v1_test"));
+    out_file->writeString("test1");
+    out_file->close();
+    dir->close();
+
+    Status close_status = writer.close();
+    if (!close_status.ok()) std::cout << "close error:" << close_status.msg() 
<< std::endl;
+    ASSERT_TRUE(close_status.ok());
+
+    const InvertedIndexFileInfo* file_info = writer.get_index_file_info();
+    ASSERT_NE(file_info, nullptr);
+    auto index_info = file_info->index_info(0);
+    ASSERT_GT(index_info.index_file_size(), 0);
+
+    int64_t total_size = writer.get_index_file_total_size();
+    ASSERT_GT(total_size, 0);
+    ASSERT_EQ(total_size, index_info.index_file_size());
+    std::cout << "total_size:" << total_size << std::endl;
+}
+
+TEST_F(InvertedIndexFileWriterTest, WriteV2Test) {
+    io::FileWriterPtr file_writer;
+    std::string index_path = 
InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix);
+    io::FileWriterOptions opts;
+    Status st = _fs->create_file(index_path, &file_writer, &opts);
+    ASSERT_TRUE(st.ok());
+    InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, 
_seg_id,
+                                   InvertedIndexStorageFormatPB::V2, 
std::move(file_writer));
+
+    int64_t index_id_1 = 1;
+    std::string index_suffix_1 = "suffix1";
+    auto index_meta_1 = create_mock_tablet_index(index_id_1, index_suffix_1);
+    ASSERT_NE(index_meta_1, nullptr);
+    auto open_result_1 = writer.open(index_meta_1.get());
+    ASSERT_TRUE(open_result_1.has_value());
+    auto dir_1 = open_result_1.value();
+    auto out_file_1 = std::unique_ptr<lucene::store::IndexOutput>(
+            dir_1->createOutput("write_v2_test_index_1"));
+    out_file_1->writeString("test1");
+    out_file_1->close();
+    dir_1->close();
+    int64_t index_id_2 = 2;
+    std::string index_suffix_2 = "suffix2";
+    auto index_meta_2 = create_mock_tablet_index(index_id_2, index_suffix_2);
+    ASSERT_NE(index_meta_2, nullptr);
+    auto open_result_2 = writer.open(index_meta_2.get());
+    ASSERT_TRUE(open_result_2.has_value());
+    auto dir_2 = open_result_2.value();
+    auto out_file_2 = std::unique_ptr<lucene::store::IndexOutput>(
+            dir_2->createOutput("write_v2_test_index_2"));
+    out_file_2->writeString("test2");
+    out_file_2->close();
+    dir_2->close();
+    Status close_status = writer.close();
+    ASSERT_TRUE(close_status.ok());
+
+    const InvertedIndexFileInfo* file_info = writer.get_index_file_info();
+    ASSERT_NE(file_info, nullptr);
+    ASSERT_GT(file_info->index_size(), 0);
+
+    int64_t total_size = writer.get_index_file_total_size();
+    ASSERT_GT(total_size, 0);
+    ASSERT_EQ(total_size, file_info->index_size());
+    std::cout << "total_size:" << total_size << std::endl;
+}
+
+TEST_F(InvertedIndexFileWriterTest, HeaderLengthTest) {
+    InvertedIndexDirectoryMap indices_dirs;
+    auto mock_dir1 = std::make_shared<DorisFSDirectory>();
+    auto mock_dir2 = std::make_shared<DorisFSDirectory>();
+    std::string local_fs_index_path_1 = 
InvertedIndexDescriptor::get_temporary_index_path(
+            
ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir().native(), 
_rowset_id,
+            _seg_id, 1, "suffix1");
+    std::string local_fs_index_path_2 = 
InvertedIndexDescriptor::get_temporary_index_path(
+            
ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir().native(), 
_rowset_id,
+            _seg_id, 2, "suffix2");
+    
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(local_fs_index_path_1).ok());
+    
EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_fs_index_path_1).ok());
+    
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(local_fs_index_path_2).ok());
+    
EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_fs_index_path_2).ok());
+    mock_dir1->init(_fs, local_fs_index_path_1.c_str());
+    mock_dir2->init(_fs, local_fs_index_path_2.c_str());
+    std::vector<std::string> files1 = {"file1.dat", "file2.dat"};
+    std::vector<std::string> files2 = {"file3.dat"};
+    for (auto& file : files1) {
+        auto out_file_1 =
+                
std::unique_ptr<lucene::store::IndexOutput>(mock_dir1->createOutput(file.c_str()));
+        out_file_1->writeString("test1");
+        out_file_1->close();
+    }
+    for (auto& file : files2) {
+        auto out_file_2 =
+                
std::unique_ptr<lucene::store::IndexOutput>(mock_dir2->createOutput(file.c_str()));
+        out_file_2->writeString("test2");
+        out_file_2->close();
+    }
+    auto insertDirectory = [&](InvertedIndexFileWriter& writer, int64_t 
index_id,
+                               const std::string& suffix,
+                               std::shared_ptr<DorisFSDirectory>& mock_dir) {
+        Status st = writer._insert_directory_into_map(index_id, suffix, 
mock_dir);
+        if (!st.ok()) {
+            std::cerr << "_insert_directory_into_map error in 
HeaderLengthTest: " << st.msg()
+                      << std::endl;
+            assert(false);
+            return;
+        }
+    };
+
+    InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, 
_seg_id,
+                                   InvertedIndexStorageFormatPB::V2);
+    insertDirectory(writer, 1, "suffix1", mock_dir1);
+    insertDirectory(writer, 2, "suffix2", mock_dir2);
+
+    int64_t header_length = writer.headerLength();
+
+    // sizeof(int32_t) * 2
+    // + (sizeof(int64_t) + sizeof(int32_t) + suffix.length() + 
sizeof(int32_t)) * num_indices
+    // + (sizeof(int32_t) + filename.length() + sizeof(int64_t) + 
sizeof(int64_t)) * num_files
+    int64_t expected_header_length = 0;
+    expected_header_length += sizeof(int32_t) * 2; // version and num_indices
+
+    // Index 1
+    expected_header_length += sizeof(int64_t); // index_id
+    expected_header_length += sizeof(int32_t); // suffix size
+    expected_header_length += 7;               // "suffix1"
+    expected_header_length += sizeof(int32_t); // file_count
+    expected_header_length += sizeof(int32_t) + 9 + sizeof(int64_t) + 
sizeof(int64_t); // file1.dat
+    expected_header_length += sizeof(int32_t) + 9 + sizeof(int64_t) + 
sizeof(int64_t); // file2.dat
+
+    // Index 2
+    expected_header_length += sizeof(int64_t); // index_id
+    expected_header_length += sizeof(int32_t); // suffix size
+    expected_header_length += 7;               // "suffix2"
+    expected_header_length += sizeof(int32_t); // file_count
+    expected_header_length += sizeof(int32_t) + 9 + sizeof(int64_t) + 
sizeof(int64_t); // file3.dat
+
+    ASSERT_EQ(header_length, expected_header_length);
+}
+
+TEST_F(InvertedIndexFileWriterTest, PrepareSortedFilesTest) {
+    auto mock_dir = std::make_shared<MockDorisFSDirectoryFileLength>();
+    std::string local_fs_index_path = 
InvertedIndexDescriptor::get_temporary_index_path(
+            
ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir().native(), 
_rowset_id,
+            _seg_id, 1, "suffix1");
+    
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(local_fs_index_path).ok());
+    
EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_fs_index_path).ok());
+    mock_dir->init(_fs, local_fs_index_path.c_str());
+    std::vector<std::string> files = {"0.segments", "0.fnm", "0.tii", 
"nullbitmap", "write.lock"};
+    for (auto& file : files) {
+        auto out_file_1 =
+                
std::unique_ptr<lucene::store::IndexOutput>(mock_dir->createOutput(file.c_str()));
+        out_file_1->writeString("test1");
+        out_file_1->close();
+    }
+
+    EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.segments")))
+            .WillOnce(testing::Return(1000));
+    EXPECT_CALL(*mock_dir, 
fileLength(testing::StrEq("0.fnm"))).WillOnce(testing::Return(2000));
+    EXPECT_CALL(*mock_dir, 
fileLength(testing::StrEq("0.tii"))).WillOnce(testing::Return(1500));
+    EXPECT_CALL(*mock_dir, 
fileLength(testing::StrEq("nullbitmap"))).WillOnce(testing::Return(500));
+
+    InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, 
_seg_id,
+                                   InvertedIndexStorageFormatPB::V2);
+    auto st = writer._insert_directory_into_map(1, "suffix1", mock_dir);
+    if (!st.ok()) {
+        std::cerr << "_insert_directory_into_map error in 
PrepareSortedFilesTest: " << st.msg()
+                  << std::endl;
+        ASSERT_TRUE(false);
+        return;
+    }
+
+    std::vector<FileInfo> sorted_files =
+            writer.prepare_sorted_files(writer._indices_dirs[std::make_pair(1, 
"suffix1")].get());
+
+    // 1. 0.segments (priority 1, size 1000)
+    // 2. 0.fnm (priority 2, size 2000)
+    // 3. 0.tii (priority 3, size 1500)
+    // 4. nullbitmap (priority 4, size 500)
+
+    std::vector<std::string> expected_order = {"0.segments", "0.fnm", "0.tii", 
"nullbitmap"};
+    ASSERT_EQ(sorted_files.size(), expected_order.size());
+
+    for (size_t i = 0; i < expected_order.size(); ++i) {
+        EXPECT_EQ(sorted_files[i].filename, expected_order[i]);
+        if (sorted_files[i].filename == "0.segments") {
+            EXPECT_EQ(sorted_files[i].filesize, 1000);
+        } else if (sorted_files[i].filename == "0.fnm") {
+            EXPECT_EQ(sorted_files[i].filesize, 2000);
+        } else if (sorted_files[i].filename == "0.tii") {
+            EXPECT_EQ(sorted_files[i].filesize, 1500);
+        } else if (sorted_files[i].filename == "nullbitmap") {
+            EXPECT_EQ(sorted_files[i].filesize, 500);
+        }
+    }
+}
+/*TEST_F(InvertedIndexFileWriterTest, CopyFileTest_OpenInputFailure) {
+    auto mock_dir = std::make_shared<MockDorisFSDirectoryOpenInput>();
+    std::string local_fs_index_path = 
InvertedIndexDescriptor::get_temporary_index_path(
+            
ExecEnv::GetInstance()->get_tmp_file_dirs()->get_tmp_file_dir().native(), 
_rowset_id,
+            _seg_id, 1, "suffix1");
+    
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(local_fs_index_path).ok());
+    
EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_fs_index_path).ok());
+    mock_dir->init(_fs, local_fs_index_path.c_str());
+    std::vector<std::string> files = {"0.segments", "0.fnm", "0.tii", 
"nullbitmap", "write.lock"};
+    for (auto& file : files) {
+        auto out_file_1 =
+                
std::unique_ptr<lucene::store::IndexOutput>(mock_dir->createOutput(file.c_str()));
+        out_file_1->writeString("test1");
+        out_file_1->close();
+    }
+    InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, 
_seg_id,
+                                   InvertedIndexStorageFormatPB::V2);
+    auto st = writer._insert_directory_into_map(1, "suffix1", mock_dir);
+    if (!st.ok()) {
+        std::cerr << "_insert_directory_into_map error in 
CopyFileTest_OpenInputFailure: "
+                  << st.msg() << std::endl;
+        ASSERT_TRUE(false);
+        return;
+    }
+
+    EXPECT_CALL(*mock_dir,
+                openInput(::testing::StrEq("0.segments"), ::testing::_, 
::testing::_, ::testing::_))
+            .WillOnce(::testing::Invoke([&](const char* name, 
lucene::store::IndexInput*& ret,
+                                            CLuceneError& err_ref, int 
bufferSize) {
+                err_ref.set(CL_ERR_IO, fmt::format("Could not open file, file 
is {}", name).data());
+                return false;
+            }));
+
+    uint8_t buffer[16384];
+    std::string error_message;
+    try {
+        writer.copyFile("0.segments", mock_dir.get(), nullptr, buffer, 
sizeof(buffer));
+    } catch (CLuceneError& err) {
+        error_message = err.what();
+    }
+    ASSERT_EQ(error_message, "Could not open file, file is 0.segments");
+}*/
+class InvertedIndexFileWriterMock : public InvertedIndexFileWriter {
+public:
+    InvertedIndexFileWriterMock(const io::FileSystemSPtr& fs, const 
std::string& index_path_prefix,
+                                const std::string& rowset_id, int32_t 
segment_id,
+                                InvertedIndexStorageFormatPB storage_format)
+            : InvertedIndexFileWriter(fs, index_path_prefix, rowset_id, 
segment_id,
+                                      storage_format) {}
+
+    MOCK_METHOD(void, write_header_and_data_v1,
+                (lucene::store::IndexOutput * output, const 
std::vector<FileInfo>& files,
+                 lucene::store::Directory* dir, int64_t header_length, int32_t 
file_count),
+                (override));
+};
+TEST_F(InvertedIndexFileWriterTest, WriteV1ExceptionHandlingTest) {
+    InvertedIndexFileWriterMock writer_mock(_fs, _index_path_prefix, 
_rowset_id, _seg_id,
+                                            InvertedIndexStorageFormatPB::V1);
+
+    int64_t index_id = 1;
+    std::string index_suffix = "suffix1";
+    auto index_meta = create_mock_tablet_index(index_id, index_suffix);
+    ASSERT_NE(index_meta, nullptr);
+
+    auto open_result = writer_mock.open(index_meta.get());
+    ASSERT_TRUE(open_result.has_value());
+    auto dir = open_result.value();
+
+    auto out_file = 
std::unique_ptr<lucene::store::IndexOutput>(dir->createOutput("test_file"));
+    out_file->writeString("test data");
+    out_file->close();
+    dir->close();
+    EXPECT_CALL(writer_mock, write_header_and_data_v1(::testing::_, 
::testing::_, ::testing::_,
+                                                      ::testing::_, 
::testing::_))
+            .WillOnce(::testing::Throw(CLuceneError(CL_ERR_IO, "Simulated 
exception", false)));
+
+    Status status = writer_mock.write_v1();
+    ASSERT_FALSE(status.ok());
+    ASSERT_EQ(status.code(), ErrorCode::INVERTED_INDEX_CLUCENE_ERROR);
+}
+class InvertedIndexFileWriterMockV2 : public InvertedIndexFileWriter {
+public:
+    InvertedIndexFileWriterMockV2(const io::FileSystemSPtr& fs,
+                                  const std::string& index_path_prefix,
+                                  const std::string& rowset_id, int32_t 
segment_id,
+                                  InvertedIndexStorageFormatPB storage_format,
+                                  io::FileWriterPtr file_writer)
+            : InvertedIndexFileWriter(fs, index_path_prefix, rowset_id, 
segment_id, storage_format,
+                                      std::move(file_writer)) {}
+
+    MOCK_METHOD(void, write_index_headers_and_metadata,
+                (lucene::store::IndexOutput * compound_file_output,
+                 const std::vector<FileMetadata>& file_metadata),
+                (override));
+};
+
+TEST_F(InvertedIndexFileWriterTest, WriteV2ExceptionHandlingTest) {
+    io::FileWriterPtr file_writer;
+    std::string index_path = 
InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix);
+    io::FileWriterOptions opts;
+    Status st = _fs->create_file(index_path, &file_writer, &opts);
+    ASSERT_TRUE(st.ok());
+    InvertedIndexFileWriterMockV2 writer_mock(_fs, _index_path_prefix, 
_rowset_id, _seg_id,
+                                              InvertedIndexStorageFormatPB::V2,
+                                              std::move(file_writer));
+
+    int64_t index_id = 1;
+    std::string index_suffix = "suffix1";
+    auto index_meta = create_mock_tablet_index(index_id, index_suffix);
+    ASSERT_NE(index_meta, nullptr);
+
+    auto open_result = writer_mock.open(index_meta.get());
+    ASSERT_TRUE(open_result.has_value());
+    auto dir = open_result.value();
+
+    auto out_file = 
std::unique_ptr<lucene::store::IndexOutput>(dir->createOutput("test_file"));
+    out_file->writeString("test data");
+    out_file->close();
+    dir->close();
+
+    EXPECT_CALL(writer_mock, write_index_headers_and_metadata(::testing::_, 
::testing::_))
+            .WillOnce(::testing::Throw(CLuceneError(CL_ERR_IO, "Simulated 
exception", false)));
+
+    Status status = writer_mock.write_v2();
+    ASSERT_FALSE(status.ok());
+    ASSERT_EQ(status.code(), ErrorCode::INVERTED_INDEX_CLUCENE_ERROR);
+}
+
+} // namespace segment_v2
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to