This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 0186f7313b2 [feature](inverted index)write separated index files in 
RAM directory to reduce IO #28810 (#29305)
0186f7313b2 is described below

commit 0186f7313b249a842f3df7ac685ce0bbd16b2db1
Author: qiye <jianliang5...@gmail.com>
AuthorDate: Sat Dec 30 09:08:22 2023 +0800

    [feature](inverted index)write separated index files in RAM directory to 
reduce IO #28810 (#29305)
---
 be/src/common/config.cpp                           |   2 +
 be/src/common/config.h                             |   2 +
 be/src/index-tools/index_tool.cpp                  |   8 +-
 be/src/olap/compaction.cpp                         |   3 +-
 .../rowset/segment_v2/inverted_index_cache.cpp     |   8 +-
 .../segment_v2/inverted_index_compaction.cpp       |  18 +-
 .../inverted_index_compound_directory.cpp          | 325 +++++++++++++++------
 .../segment_v2/inverted_index_compound_directory.h | 131 ++++++---
 .../rowset/segment_v2/inverted_index_reader.cpp    |   6 +-
 .../rowset/segment_v2/inverted_index_writer.cpp    |  15 +-
 10 files changed, 369 insertions(+), 149 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index af08d2deaab..a762d0de392 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1007,6 +1007,8 @@ DEFINE_Int32(inverted_index_read_buffer_size, "4096");
 DEFINE_Int32(max_depth_in_bkd_tree, "32");
 // index compaction
 DEFINE_mBool(inverted_index_compaction_enable, "false");
+// index by RAM directory
+DEFINE_mBool(inverted_index_ram_dir_enable, "false");
 // use num_broadcast_buffer blocks as buffer to do broadcast
 DEFINE_Int32(num_broadcast_buffer, "32");
 // semi-structure configs
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4fc3bc8dbfa..f71aad99ab3 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1042,6 +1042,8 @@ DECLARE_mInt32(inverted_index_max_buffered_docs);
 DECLARE_Int32(max_depth_in_bkd_tree);
 // index compaction
 DECLARE_mBool(inverted_index_compaction_enable);
+// index by RAM directory
+DECLARE_mBool(inverted_index_ram_dir_enable);
 // use num_broadcast_buffer blocks as buffer to do broadcast
 DECLARE_Int32(num_broadcast_buffer);
 // semi-structure configs
diff --git a/be/src/index-tools/index_tool.cpp 
b/be/src/index-tools/index_tool.cpp
index 9cd72795ef9..a5ecf9b996f 100644
--- a/be/src/index-tools/index_tool.cpp
+++ b/be/src/index-tools/index_tool.cpp
@@ -30,7 +30,7 @@
 #include "olap/rowset/segment_v2/inverted_index_compound_reader.h"
 
 using doris::segment_v2::DorisCompoundReader;
-using doris::segment_v2::DorisCompoundDirectory;
+using doris::segment_v2::DorisCompoundDirectoryFactory;
 using doris::io::FileInfo;
 using namespace lucene::analysis;
 using namespace lucene::index;
@@ -150,7 +150,7 @@ int main(int argc, char** argv) {
         auto fs = doris::io::global_local_filesystem();
         try {
             lucene::store::Directory* dir =
-                    DorisCompoundDirectory::getDirectory(fs, dir_str.c_str());
+                    DorisCompoundDirectoryFactory::getDirectory(fs, 
dir_str.c_str());
             auto reader = new DorisCompoundReader(dir, file_str.c_str(), 4096);
             std::vector<std::string> files;
             std::cout << "Nested files for " << file_str << std::endl;
@@ -173,7 +173,7 @@ int main(int argc, char** argv) {
         auto fs = doris::io::global_local_filesystem();
         try {
             lucene::store::Directory* dir =
-                    DorisCompoundDirectory::getDirectory(fs, dir_str.c_str());
+                    DorisCompoundDirectoryFactory::getDirectory(fs, 
dir_str.c_str());
             auto reader = new DorisCompoundReader(dir, file_str.c_str(), 4096);
             std::cout << "Term statistics for " << file_str << std::endl;
             std::cout << "==================================" << std::endl;
@@ -190,7 +190,7 @@ int main(int argc, char** argv) {
         auto fs = doris::io::global_local_filesystem();
         try {
             lucene::store::Directory* dir =
-                    DorisCompoundDirectory::getDirectory(fs, 
FLAGS_directory.c_str());
+                    DorisCompoundDirectoryFactory::getDirectory(fs, 
FLAGS_directory.c_str());
             if (FLAGS_idx_file_name == "") {
                 //try to search from directory's all files
                 std::vector<FileInfo> files;
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index f6c8b3bb5bf..2cbd354b6de 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -633,7 +633,8 @@ Status 
Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool
                                 std::string dir_str = p.parent_path().string();
                                 std::string file_str = p.filename().string();
                                 lucene::store::Directory* dir =
-                                        
DorisCompoundDirectory::getDirectory(fs, dir_str.c_str());
+                                        
DorisCompoundDirectoryFactory::getDirectory(
+                                                fs, dir_str.c_str());
                                 DorisCompoundReader reader(dir, 
file_str.c_str());
                                 std::vector<std::string> files;
                                 reader.list(&files);
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
index f3c68984ebb..0804fcca2b2 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
@@ -43,9 +43,9 @@ InvertedIndexSearcherCache* 
InvertedIndexSearcherCache::_s_instance = nullptr;
 IndexSearcherPtr InvertedIndexSearcherCache::build_index_searcher(const 
io::FileSystemSPtr& fs,
                                                                   const 
std::string& index_dir,
                                                                   const 
std::string& file_name) {
-    DorisCompoundReader* directory =
-            new DorisCompoundReader(DorisCompoundDirectory::getDirectory(fs, 
index_dir.c_str()),
-                                    file_name.c_str(), 
config::inverted_index_read_buffer_size);
+    DorisCompoundReader* directory = new DorisCompoundReader(
+            DorisCompoundDirectoryFactory::getDirectory(fs, 
index_dir.c_str()), file_name.c_str(),
+            config::inverted_index_read_buffer_size);
     auto closeDirectory = true;
     auto index_searcher =
             std::make_shared<lucene::search::IndexSearcher>(directory, 
closeDirectory);
@@ -190,7 +190,7 @@ int64_t InvertedIndexSearcherCache::mem_consumption() {
 
 bool InvertedIndexSearcherCache::_lookup(const 
InvertedIndexSearcherCache::CacheKey& key,
                                          InvertedIndexCacheHandle* handle) {
-    auto lru_handle = _cache->lookup(key.index_file_path);
+    auto* lru_handle = _cache->lookup(key.index_file_path);
     if (lru_handle == nullptr) {
         return false;
     }
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
index b3a28c6ebfc..8458de9e7e3 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
@@ -22,8 +22,7 @@
 #include "inverted_index_compound_directory.h"
 #include "inverted_index_compound_reader.h"
 
-namespace doris {
-namespace segment_v2 {
+namespace doris::segment_v2 {
 Status compact_column(int32_t index_id, int src_segment_num, int 
dest_segment_num,
                       std::vector<std::string> src_index_files,
                       std::vector<std::string> dest_index_files, const 
io::FileSystemSPtr& fs,
@@ -31,7 +30,7 @@ Status compact_column(int32_t index_id, int src_segment_num, 
int dest_segment_nu
                       std::vector<std::vector<std::pair<uint32_t, uint32_t>>> 
trans_vec,
                       std::vector<uint32_t> dest_segment_num_rows) {
     lucene::store::Directory* dir =
-            DorisCompoundDirectory::getDirectory(fs, 
index_writer_path.c_str(), false);
+            DorisCompoundDirectoryFactory::getDirectory(fs, 
index_writer_path.c_str());
     lucene::analysis::SimpleAnalyzer<char> analyzer;
     auto index_writer = _CLNEW lucene::index::IndexWriter(dir, &analyzer, true 
/* create */,
                                                           true /* 
closeDirOnShutdown */);
@@ -42,8 +41,8 @@ Status compact_column(int32_t index_id, int src_segment_num, 
int dest_segment_nu
         // format: rowsetId_segmentId_indexId.idx
         std::string src_idx_full_name =
                 src_index_files[i] + "_" + std::to_string(index_id) + ".idx";
-        DorisCompoundReader* reader = new DorisCompoundReader(
-                DorisCompoundDirectory::getDirectory(fs, tablet_path.c_str()),
+        auto* reader = new DorisCompoundReader(
+                DorisCompoundDirectoryFactory::getDirectory(fs, 
tablet_path.c_str()),
                 src_idx_full_name.c_str());
         src_index_dirs[i] = reader;
     }
@@ -53,7 +52,7 @@ Status compact_column(int32_t index_id, int src_segment_num, 
int dest_segment_nu
     for (int i = 0; i < dest_segment_num; ++i) {
         // format: rowsetId_segmentId_columnId
         auto path = tablet_path + "/" + dest_index_files[i] + "_" + 
std::to_string(index_id);
-        dest_index_dirs[i] = DorisCompoundDirectory::getDirectory(fs, 
path.c_str(), true);
+        dest_index_dirs[i] = DorisCompoundDirectoryFactory::getDirectory(fs, 
path.c_str(), true);
     }
 
     DCHECK_EQ(src_index_dirs.size(), trans_vec.size());
@@ -66,13 +65,13 @@ Status compact_column(int32_t index_id, int 
src_segment_num, int dest_segment_nu
     // when index_writer is destroyed, if closeDir is set, dir will be close
     // _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir 
will be destroyed.
     _CLDECDELETE(dir)
-    for (auto d : src_index_dirs) {
+    for (auto* d : src_index_dirs) {
         if (d != nullptr) {
             d->close();
             _CLDELETE(d);
         }
     }
-    for (auto d : dest_index_dirs) {
+    for (auto* d : dest_index_dirs) {
         if (d != nullptr) {
             // NOTE: DO NOT close dest dir here, because it will be closed 
when dest index writer finalize.
             //d->close();
@@ -84,5 +83,4 @@ Status compact_column(int32_t index_id, int src_segment_num, 
int dest_segment_nu
     fs->delete_directory(index_writer_path.c_str());
     return Status::OK();
 }
-} // namespace segment_v2
-} // namespace doris
+} // namespace doris::segment_v2
diff --git 
a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
index cba340a26f4..0796102e908 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
@@ -18,10 +18,10 @@
 #include "olap/rowset/segment_v2/inverted_index_compound_directory.h"
 
 #include "CLucene/SharedHeader.h"
+#include "CLucene/_SharedHeader.h"
 #include "common/status.h"
 #include "io/fs/file_reader.h"
 #include "io/fs/file_writer.h"
-#include "io/fs/path.h"
 #include "util/debug_points.h"
 #include "util/slice.h"
 
@@ -47,17 +47,14 @@
 #include <CLucene/store/RAMDirectory.h>
 #include <CLucene/util/Misc.h>
 #include <assert.h>
-// IWYU pragma: no_include <bthread/errno.h>
 #include <errno.h> // IWYU pragma: keep
 #include <glog/logging.h>
 #include <stdio.h>
 #include <string.h>
 #include <wchar.h>
 
-#include <algorithm>
 #include <filesystem>
 #include <iostream>
-#include <memory>
 #include <mutex>
 #include <utility>
 
@@ -76,15 +73,12 @@
         LOG(WARNING) << err;                                     \
         _CLTHROWA(CL_ERR_IO, err.c_str());                       \
     }
-namespace doris {
-namespace segment_v2 {
+namespace doris::segment_v2 {
 
 const char* WRITE_LOCK_FILE = "write.lock";
 const char* COMPOUND_FILE_EXTENSION = ".idx";
 const int64_t MAX_HEADER_DATA_SIZE = 1024 * 128; // 128k
 
-bool DorisCompoundDirectory::disableLocks = false;
-
 DorisCompoundFileWriter::DorisCompoundFileWriter(CL_NS(store)::Directory* dir) 
{
     if (dir == nullptr) {
         _CLTHROWA(CL_ERR_NullPointer, "directory cannot be null");
@@ -123,7 +117,7 @@ void DorisCompoundFileWriter::writeCompoundFile() {
     std::string idx_name = std::string(cfs_path.stem().c_str()) + 
COMPOUND_FILE_EXTENSION;
     // write file entries to ram directory to get header length
     lucene::store::RAMDirectory ram_dir;
-    auto out_idx = ram_dir.createOutput(idx_name.c_str());
+    auto* out_idx = ram_dir.createOutput(idx_name.c_str());
     if (out_idx == nullptr) {
         LOG(WARNING) << "Write compound file error: RAMDirectory output is 
nullptr.";
         _CLTHROWA(CL_ERR_IO, "Create RAMDirectory output error");
@@ -153,9 +147,9 @@ void DorisCompoundFileWriter::writeCompoundFile() {
     ram_dir.close();
 
     auto compound_fs = 
((DorisCompoundDirectory*)directory)->getCompoundFileSystem();
-    auto out_dir = DorisCompoundDirectory::getDirectory(compound_fs, 
idx_path.c_str(), false);
+    auto* out_dir = DorisCompoundDirectoryFactory::getDirectory(compound_fs, 
idx_path.c_str());
 
-    auto out = out_dir->createOutput(idx_name.c_str());
+    auto* out = out_dir->createOutput(idx_name.c_str());
     if (out == nullptr) {
         LOG(WARNING) << "Write compound file error: CompoundDirectory output 
is nullptr.";
         _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
@@ -261,7 +255,7 @@ bool DorisCompoundDirectory::FSIndexInput::open(const 
io::FileSystemSPtr& fs, co
     if (buffer_size == -1) {
         buffer_size = CL_NS(store)::BufferedIndexOutput::BUFFER_SIZE;
     }
-    SharedHandle* h = _CLNEW SharedHandle(path);
+    auto* h = _CLNEW SharedHandle(path);
 
     if (!fs->open_file(path, &h->_reader).ok()) {
         error.set(CL_ERR_IO, "open file error");
@@ -298,7 +292,7 @@ DorisCompoundDirectory::FSIndexInput::FSIndexInput(const 
FSIndexInput& other)
         _CLTHROWA(CL_ERR_NullPointer, "other handle is null");
     }
 
-    std::lock_guard<doris::Mutex> wlock(*other._handle->_shared_lock);
+    std::lock_guard<std::mutex> wlock(*other._handle->_shared_lock);
     _handle = _CL_POINTER(other._handle);
     _pos = other._handle->_fpos; //note where we are currently...
 }
@@ -307,7 +301,7 @@ 
DorisCompoundDirectory::FSIndexInput::SharedHandle::SharedHandle(const char* pat
     _length = 0;
     _fpos = 0;
     strcpy(this->path, path);
-    _shared_lock = new doris::Mutex();
+    _shared_lock = new std::mutex();
 }
 
 DorisCompoundDirectory::FSIndexInput::SharedHandle::~SharedHandle() {
@@ -328,10 +322,10 @@ lucene::store::IndexInput* 
DorisCompoundDirectory::FSIndexInput::clone() const {
 void DorisCompoundDirectory::FSIndexInput::close() {
     BufferedIndexInput::close();
     if (_handle != nullptr) {
-        doris::Mutex* lock = _handle->_shared_lock;
+        std::mutex* lock = _handle->_shared_lock;
         bool ref = false;
         {
-            std::lock_guard<doris::Mutex> wlock(*lock);
+            std::lock_guard<std::mutex> wlock(*lock);
             //determine if we are about to delete the handle...
             ref = (_LUCENE_ATOMIC_INT_GET(_handle->__cl_refcount) > 1);
             //decdelete (deletes if refcount is down to 0
@@ -354,7 +348,7 @@ void 
DorisCompoundDirectory::FSIndexInput::seekInternal(const int64_t position)
 void DorisCompoundDirectory::FSIndexInput::readInternal(uint8_t* b, const 
int32_t len) {
     CND_PRECONDITION(_handle != nullptr, "shared file handle has closed");
     CND_PRECONDITION(_handle->_reader != nullptr, "file is not open");
-    std::lock_guard<doris::Mutex> wlock(*_handle->_shared_lock);
+    std::lock_guard<std::mutex> wlock(*_handle->_shared_lock);
 
     int64_t position = getFilePointer();
     if (_pos != position) {
@@ -498,10 +492,12 @@ DorisCompoundDirectory::DorisCompoundDirectory() {
 }
 
 void DorisCompoundDirectory::init(const io::FileSystemSPtr& _fs, const char* 
_path,
+                                  bool use_compound_file_writer,
                                   lucene::store::LockFactory* lock_factory,
                                   const io::FileSystemSPtr& cfs, const char* 
cfs_path) {
     fs = _fs;
     directory = _path;
+    useCompoundFileWriter = use_compound_file_writer;
 
     if (cfs == nullptr) {
         compound_fs = fs;
@@ -513,17 +509,12 @@ void DorisCompoundDirectory::init(const 
io::FileSystemSPtr& _fs, const char* _pa
     } else {
         cfs_directory = _path;
     }
-    bool doClearLockID = false;
 
     if (lock_factory == nullptr) {
         lock_factory = _CLNEW lucene::store::NoLockFactory();
     }
 
-    setLockFactory(lock_factory);
-
-    if (doClearLockID) {
-        lockFactory->setLockPrefix(nullptr);
-    }
+    lucene::store::Directory::setLockFactory(lock_factory);
 
     // It's fail checking directory existence in S3.
     if (fs->type() == io::FileSystemType::S3) {
@@ -538,24 +529,6 @@ void DorisCompoundDirectory::init(const 
io::FileSystemSPtr& _fs, const char* _pa
     }
 }
 
-void DorisCompoundDirectory::create() {
-    std::lock_guard<doris::Mutex> wlock(_this_lock);
-
-    //clear old files
-    std::vector<std::string> files;
-    lucene::util::Misc::listFiles(directory.c_str(), files, false);
-    std::vector<std::string>::iterator itr = files.begin();
-    while (itr != files.end()) {
-        if (CL_NS(index)::IndexReader::isLuceneFile(itr->c_str())) {
-            if (unlink((directory + PATH_DELIMITERA + *itr).c_str()) == -1) {
-                _CLTHROWA(CL_ERR_IO, "Couldn't delete file ");
-            }
-        }
-        itr++;
-    }
-    lockFactory->clearLock(CL_NS(index)::IndexWriter::WRITE_LOCK_NAME);
-}
-
 void DorisCompoundDirectory::priv_getFN(char* buffer, const char* name) const {
     buffer[0] = 0;
     strcpy(buffer, directory.c_str());
@@ -598,45 +571,6 @@ const char* DorisCompoundDirectory::getCfsDirName() const {
     return cfs_directory.c_str();
 }
 
-DorisCompoundDirectory* DorisCompoundDirectory::getDirectory(const 
io::FileSystemSPtr& fs,
-                                                             const char* file,
-                                                             bool 
use_compound_file_writer,
-                                                             const 
io::FileSystemSPtr& cfs_fs,
-                                                             const char* 
cfs_file) {
-    DorisCompoundDirectory* dir =
-            getDirectory(fs, file, (lucene::store::LockFactory*)nullptr, 
cfs_fs, cfs_file);
-    dir->useCompoundFileWriter = use_compound_file_writer;
-    return dir;
-}
-
-//static
-DorisCompoundDirectory* DorisCompoundDirectory::getDirectory(
-        const io::FileSystemSPtr& _fs, const char* _file, 
lucene::store::LockFactory* lock_factory,
-        const io::FileSystemSPtr& _cfs, const char* _cfs_file) {
-    const char* cfs_file = _cfs_file;
-    if (cfs_file == nullptr) {
-        cfs_file = _file;
-    }
-    DorisCompoundDirectory* dir = nullptr;
-    if (!_file || !*_file) {
-        _CLTHROWA(CL_ERR_IO, "Invalid directory");
-    }
-
-    const char* file = _file;
-
-    bool exists = false;
-    LOG_AND_THROW_IF_ERROR(_fs->exists(file, &exists), "Get directory exists 
IO error")
-    if (!exists) {
-        LOG_AND_THROW_IF_ERROR(_fs->create_directory(file),
-                               "Get directory create directory IO error")
-    }
-
-    dir = _CLNEW DorisCompoundDirectory();
-    dir->init(_fs, file, lock_factory, _cfs, cfs_file);
-
-    return dir;
-}
-
 int64_t DorisCompoundDirectory::fileModified(const char* name) const {
     CND_PRECONDITION(directory[0] != 0, "directory is not open");
     struct stat buf;
@@ -677,7 +611,7 @@ bool DorisCompoundDirectory::openInput(const char* name, 
lucene::store::IndexInp
 
 void DorisCompoundDirectory::close() {
     if (useCompoundFileWriter) {
-        DorisCompoundFileWriter* cfsWriter = _CLNEW 
DorisCompoundFileWriter(this);
+        auto* cfsWriter = _CLNEW DorisCompoundFileWriter(this);
         // write compound file
         cfsWriter->writeCompoundFile();
         // delete index path, which contains separated inverted index files
@@ -705,7 +639,7 @@ bool DorisCompoundDirectory::deleteDirectory() {
 
 void DorisCompoundDirectory::renameFile(const char* from, const char* to) {
     CND_PRECONDITION(directory[0] != 0, "directory is not open");
-    std::lock_guard<doris::Mutex> wlock(_this_lock);
+    std::lock_guard<std::mutex> wlock(_this_lock);
     char old[CL_MAX_DIR];
     priv_getFN(old, from);
 
@@ -733,7 +667,7 @@ lucene::store::IndexOutput* 
DorisCompoundDirectory::createOutput(const char* nam
         LOG_AND_THROW_IF_ERROR(fs->exists(fl, &exists), "Create output file 
exists IO error")
         assert(!exists);
     }
-    auto ret = _CLNEW FSIndexOutput();
+    auto* ret = _CLNEW FSIndexOutput();
     try {
         ret->init(fs, fl);
     } catch (CLuceneError& err) {
@@ -747,5 +681,226 @@ std::string DorisCompoundDirectory::toString() const {
     return std::string("DorisCompoundDirectory@") + this->directory;
 }
 
-} // namespace segment_v2
-} // namespace doris
+/**
+ * DorisRAMCompoundDirectory
+ */
+DorisRAMCompoundDirectory::DorisRAMCompoundDirectory() {
+    filesMap = _CLNEW FileMap(true, true);
+    this->sizeInBytes = 0;
+}
+
+DorisRAMCompoundDirectory::~DorisRAMCompoundDirectory() {
+    _CLDELETE(lockFactory);
+    _CLDELETE(filesMap);
+}
+
+void DorisRAMCompoundDirectory::init(const io::FileSystemSPtr& _fs, const 
char* _path,
+                                     bool use_compound_file_writer,
+                                     lucene::store::LockFactory* lock_factory,
+                                     const io::FileSystemSPtr& cfs, const 
char* cfs_path) {
+    fs = _fs;
+    directory = _path;
+    useCompoundFileWriter = use_compound_file_writer;
+
+    if (cfs == nullptr) {
+        compound_fs = fs;
+    } else {
+        compound_fs = cfs;
+    }
+    if (cfs_path != nullptr) {
+        cfs_directory = cfs_path;
+    } else {
+        cfs_directory = _path;
+    }
+
+    lucene::store::Directory::setLockFactory(_CLNEW 
lucene::store::SingleInstanceLockFactory());
+}
+
+bool DorisRAMCompoundDirectory::list(std::vector<std::string>* names) const {
+    std::lock_guard<std::mutex> wlock(_this_lock);
+    auto itr = filesMap->begin();
+    while (itr != filesMap->end()) {
+        names->emplace_back(itr->first);
+        ++itr;
+    }
+    return true;
+}
+
+bool DorisRAMCompoundDirectory::fileExists(const char* name) const {
+    std::lock_guard<std::mutex> wlock(_this_lock);
+    return filesMap->exists((char*)name);
+}
+
+int64_t DorisRAMCompoundDirectory::fileModified(const char* name) const {
+    std::lock_guard<std::mutex> wlock(_this_lock);
+    auto* f = filesMap->get((char*)name);
+    return f->getLastModified();
+}
+
+void DorisRAMCompoundDirectory::touchFile(const char* name) {
+    lucene::store::RAMFile* file = nullptr;
+    {
+        std::lock_guard<std::mutex> wlock(_this_lock);
+        file = filesMap->get((char*)name);
+    }
+    const uint64_t ts1 = file->getLastModified();
+    uint64_t ts2 = lucene::util::Misc::currentTimeMillis();
+
+    //make sure that the time has actually changed
+    while (ts1 == ts2) {
+        _LUCENE_SLEEP(1);
+        ts2 = lucene::util::Misc::currentTimeMillis();
+    };
+
+    file->setLastModified(ts2);
+}
+
+int64_t DorisRAMCompoundDirectory::fileLength(const char* name) const {
+    std::lock_guard<std::mutex> wlock(_this_lock);
+    auto* f = filesMap->get((char*)name);
+    return f->getLength();
+}
+
+bool DorisRAMCompoundDirectory::openInput(const char* name, 
lucene::store::IndexInput*& ret,
+                                          CLuceneError& error, int32_t 
bufferSize) {
+    std::lock_guard<std::mutex> wlock(_this_lock);
+    auto* file = filesMap->get((char*)name);
+    if (file == nullptr) {
+        error.set(CL_ERR_IO,
+                  "[DorisRAMCompoundDirectory::open] The requested file does 
not exist.");
+        return false;
+    }
+    ret = _CLNEW lucene::store::RAMInputStream(file);
+    return true;
+}
+
+void DorisRAMCompoundDirectory::close() {
+    // write compound file
+    DorisCompoundDirectory::close();
+
+    std::lock_guard<std::mutex> wlock(_this_lock);
+    filesMap->clear();
+    _CLDELETE(filesMap);
+}
+
+bool DorisRAMCompoundDirectory::doDeleteFile(const char* name) {
+    std::lock_guard<std::mutex> wlock(_this_lock);
+    auto itr = filesMap->find((char*)name);
+    if (itr != filesMap->end()) {
+        SCOPED_LOCK_MUTEX(this->THIS_LOCK);
+        sizeInBytes -= itr->second->sizeInBytes;
+        filesMap->removeitr(itr);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+bool DorisRAMCompoundDirectory::deleteDirectory() {
+    // do nothing, RAM dir do not have actual files
+    return true;
+}
+
+void DorisRAMCompoundDirectory::renameFile(const char* from, const char* to) {
+    std::lock_guard<std::mutex> wlock(_this_lock);
+    auto itr = filesMap->find((char*)from);
+
+    /* DSR:CL_BUG_LEAK:
+    ** If a file named $to already existed, its old value was leaked.
+    ** My inclination would be to prevent this implicit deletion with an
+    ** exception, but it happens routinely in CLucene's internals (e.g., during
+    ** IndexWriter.addIndexes with the file named 'segments'). */
+    if (filesMap->exists((char*)to)) {
+        auto itr1 = filesMap->find((char*)to);
+        SCOPED_LOCK_MUTEX(this->THIS_LOCK);
+        sizeInBytes -= itr1->second->sizeInBytes;
+        filesMap->removeitr(itr1);
+    }
+    if (itr == filesMap->end()) {
+        char tmp[1024];
+        snprintf(tmp, 1024, "cannot rename %s, file does not exist", from);
+        _CLTHROWT(CL_ERR_IO, tmp);
+    }
+    DCHECK(itr != filesMap->end());
+    auto* file = itr->second;
+    filesMap->removeitr(itr, false, true);
+    filesMap->put(strdup(to), file);
+}
+
+lucene::store::IndexOutput* DorisRAMCompoundDirectory::createOutput(const 
char* name) {
+    /* Check the $filesMap VoidMap to see if there was a previous file named
+    ** $name.  If so, delete the old RAMFile object, but reuse the existing
+    ** char buffer ($n) that holds the filename.  If not, duplicate the
+    ** supplied filename buffer ($name) and pass ownership of that memory ($n)
+    ** to $files. */
+    std::lock_guard<std::mutex> wlock(_this_lock);
+
+    // get the actual pointer to the output name
+    char* n = nullptr;
+    auto itr = filesMap->find(const_cast<char*>(name));
+    if (itr != filesMap->end()) {
+        n = itr->first;
+        lucene::store::RAMFile* rf = itr->second;
+        SCOPED_LOCK_MUTEX(this->THIS_LOCK);
+        sizeInBytes -= rf->sizeInBytes;
+        _CLDELETE(rf);
+    } else {
+        n = STRDUP_AtoA(name);
+    }
+
+    auto* file = _CLNEW lucene::store::RAMFile();
+    (*filesMap)[n] = file;
+
+    return _CLNEW lucene::store::RAMOutputStream(file);
+}
+
+std::string DorisRAMCompoundDirectory::toString() const {
+    return std::string("DorisRAMCompoundDirectory@") + this->directory;
+}
+
+const char* DorisRAMCompoundDirectory::getClassName() {
+    return "DorisRAMCompoundDirectory";
+}
+
+const char* DorisRAMCompoundDirectory::getObjectName() const {
+    return getClassName();
+}
+
+/**
+ * DorisCompoundDirectoryFactory
+ */
+DorisCompoundDirectory* DorisCompoundDirectoryFactory::getDirectory(
+        const io::FileSystemSPtr& _fs, const char* _file, bool 
use_compound_file_writer,
+        bool can_use_ram_dir, lucene::store::LockFactory* lock_factory,
+        const io::FileSystemSPtr& _cfs, const char* _cfs_file) {
+    const char* cfs_file = _cfs_file;
+    if (cfs_file == nullptr) {
+        cfs_file = _file;
+    }
+    DorisCompoundDirectory* dir = nullptr;
+    if (!_file || !*_file) {
+        _CLTHROWA(CL_ERR_IO, "Invalid directory");
+    }
+
+    const char* file = _file;
+
+    // Write by RAM directory
+    // 1. only write separated index files, which is can_use_ram_dir = true.
+    // 2. config::inverted_index_ram_dir_enable = true
+    if (config::inverted_index_ram_dir_enable && can_use_ram_dir) {
+        dir = _CLNEW DorisRAMCompoundDirectory();
+    } else {
+        bool exists = false;
+        LOG_AND_THROW_IF_ERROR(_fs->exists(file, &exists), "Get directory 
exists IO error")
+        if (!exists) {
+            LOG_AND_THROW_IF_ERROR(_fs->create_directory(file),
+                                   "Get directory create directory IO error")
+        }
+        dir = _CLNEW DorisCompoundDirectory();
+    }
+    dir->init(_fs, file, use_compound_file_writer, lock_factory, _cfs, 
cfs_file);
+
+    return dir;
+}
+
+} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
index b06f980f6bd..7ae0e618a45 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
@@ -21,6 +21,7 @@
 #include <CLucene/store/Directory.h>
 #include <CLucene/store/IndexInput.h>
 #include <CLucene/store/IndexOutput.h>
+#include <CLucene/store/_RAMDirectory.h>
 #include <stdint.h>
 
 #include <string>
@@ -29,19 +30,15 @@
 #include "CLucene/SharedHeader.h"
 #include "io/fs/file_reader_writer_fwd.h"
 #include "io/fs/file_system.h"
-#include "util/lock.h"
+#include "io/io_common.h"
 
 class CLuceneError;
 
-namespace lucene {
-namespace store {
+namespace lucene::store {
 class LockFactory;
-} // namespace store
-} // namespace lucene
+} // namespace lucene::store
 
-namespace doris {
-
-namespace segment_v2 {
+namespace doris::segment_v2 {
 
 class DorisCompoundFileWriter : LUCENE_BASE {
 public:
@@ -61,26 +58,15 @@ class CLUCENE_EXPORT DorisCompoundDirectory : public 
lucene::store::Directory {
 private:
     int filemode;
 
-    doris::Mutex _this_lock;
-
 protected:
-    DorisCompoundDirectory();
-    virtual void init(const io::FileSystemSPtr& fs, const char* path,
-                      lucene::store::LockFactory* lock_factory = nullptr,
-                      const io::FileSystemSPtr& compound_fs = nullptr,
-                      const char* cfs_path = nullptr);
-    void priv_getFN(char* buffer, const char* name) const;
-
-private:
+    mutable std::mutex _this_lock;
     io::FileSystemSPtr fs;
     io::FileSystemSPtr compound_fs;
     std::string directory;
     std::string cfs_directory;
-    void create();
-    static bool disableLocks;
     bool useCompoundFileWriter {false};
 
-protected:
+    void priv_getFN(char* buffer, const char* name) const;
     /// Removes an existing file in the directory.
     bool doDeleteFile(const char* name) override;
 
@@ -98,16 +84,6 @@ public:
     bool list(std::vector<std::string>* names) const override;
     bool fileExists(const char* name) const override;
     const char* getCfsDirName() const;
-    static DorisCompoundDirectory* getDirectory(const io::FileSystemSPtr& fs, 
const char* file,
-                                                lucene::store::LockFactory* 
lock_factory = nullptr,
-                                                const io::FileSystemSPtr& 
cfs_fs = nullptr,
-                                                const char* cfs_file = 
nullptr);
-
-    static DorisCompoundDirectory* getDirectory(const io::FileSystemSPtr& fs, 
const char* file,
-                                                bool use_compound_file_writer,
-                                                const io::FileSystemSPtr& 
cfs_fs = nullptr,
-                                                const char* cfs_file = 
nullptr);
-
     int64_t fileModified(const char* name) const override;
     int64_t fileLength(const char* name) const override;
     bool openInput(const char* name, lucene::store::IndexInput*& ret, 
CLuceneError& err,
@@ -119,7 +95,77 @@ public:
     std::string toString() const override;
     static const char* getClassName();
     const char* getObjectName() const override;
-    bool deleteDirectory();
+    virtual bool deleteDirectory();
+
+    DorisCompoundDirectory();
+
+    virtual void init(const io::FileSystemSPtr& fs, const char* path, bool 
use_compound_file_writer,
+                      lucene::store::LockFactory* lock_factory = nullptr,
+                      const io::FileSystemSPtr& compound_fs = nullptr,
+                      const char* cfs_path = nullptr);
+};
+
+class CLUCENE_EXPORT DorisRAMCompoundDirectory : public DorisCompoundDirectory 
{
+protected:
+    using FileMap =
+            lucene::util::CLHashMap<char*, lucene::store::RAMFile*, 
lucene::util::Compare::Char,
+                                    lucene::util::Equals::Char, 
lucene::util::Deletor::acArray,
+                                    
lucene::util::Deletor::Object<lucene::store::RAMFile>>;
+
+    // unlike the java Hashtable, FileMap is not synchronized, and all access 
must be protected by a lock
+    FileMap* filesMap;
+    void init(const io::FileSystemSPtr& fs, const char* path, bool 
use_compound_file_writer,
+              lucene::store::LockFactory* lock_factory = nullptr,
+              const io::FileSystemSPtr& compound_fs = nullptr,
+              const char* cfs_path = nullptr) override;
+
+public:
+    int64_t sizeInBytes;
+
+    /// Returns a null terminated array of strings, one for each file in the 
directory.
+    bool list(std::vector<std::string>* names) const override;
+
+    /** Constructs an empty {@link Directory}. */
+    DorisRAMCompoundDirectory();
+
+    ///Destructor - only call this if you are sure the directory
+    ///is not being used anymore. Otherwise use the ref-counting
+    ///facilities of dir->close
+    ~DorisRAMCompoundDirectory() override;
+
+    bool doDeleteFile(const char* name) override;
+
+    bool deleteDirectory() override;
+
+    /// Returns true iff the named file exists in this directory.
+    bool fileExists(const char* name) const override;
+
+    /// Returns the time the named file was last modified.
+    int64_t fileModified(const char* name) const override;
+
+    /// Returns the length in bytes of a file in the directory.
+    int64_t fileLength(const char* name) const override;
+
+    /// Removes an existing file in the directory.
+    void renameFile(const char* from, const char* to) override;
+
+    /** Set the modified time of an existing file to now. */
+    void touchFile(const char* name) override;
+
+    /// Creates a new, empty file in the directory with the given name.
+    ///        Returns a stream writing this file.
+    lucene::store::IndexOutput* createOutput(const char* name) override;
+
+    /// Returns a stream reading an existing file.
+    bool openInput(const char* name, lucene::store::IndexInput*& ret, 
CLuceneError& error,
+                   int32_t bufferSize = -1) override;
+
+    void close() override;
+
+    std::string toString() const override;
+
+    static const char* getClassName();
+    const char* getObjectName() const override;
 };
 
 class DorisCompoundDirectory::FSIndexInput : public 
lucene::store::BufferedIndexInput {
@@ -128,13 +174,13 @@ class DorisCompoundDirectory::FSIndexInput : public 
lucene::store::BufferedIndex
         io::FileReaderSPtr _reader;
         uint64_t _length;
         int64_t _fpos;
-        doris::Mutex* _shared_lock;
+        std::mutex* _shared_lock;
         char path[4096];
         SharedHandle(const char* path);
         ~SharedHandle() override;
     };
 
-    SharedHandle* _handle;
+    SharedHandle* _handle = nullptr;
     int64_t _pos;
 
     FSIndexInput(SharedHandle* handle, int32_t buffer_size) : 
BufferedIndexInput(buffer_size) {
@@ -158,7 +204,7 @@ public:
     const char* getObjectName() const override { return getClassName(); }
     static const char* getClassName() { return "FSIndexInput"; }
 
-    doris::Mutex _this_lock;
+    std::mutex _this_lock;
 
 protected:
     // Random-access methods
@@ -167,5 +213,16 @@ protected:
     void readInternal(uint8_t* b, const int32_t len) override;
 };
 
-} // namespace segment_v2
-} // namespace doris
+/**
+ * Factory function to create DorisCompoundDirectory
+ */
+class DorisCompoundDirectoryFactory {
+public:
+    static DorisCompoundDirectory* getDirectory(const io::FileSystemSPtr& fs, 
const char* file,
+                                                bool use_compound_file_writer 
= false,
+                                                bool can_use_ram_dir = false,
+                                                lucene::store::LockFactory* 
lock_factory = nullptr,
+                                                const io::FileSystemSPtr& 
cfs_fs = nullptr,
+                                                const char* cfs_file = 
nullptr);
+};
+} // namespace doris::segment_v2
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
index 6e1ebb53a9d..7934c63286d 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
@@ -197,7 +197,7 @@ Status 
InvertedIndexReader::read_null_bitmap(InvertedIndexQueryCacheHandle* cach
 
         if (!dir) {
             dir = new DorisCompoundReader(
-                    DorisCompoundDirectory::getDirectory(_fs, 
index_dir.c_str()),
+                    DorisCompoundDirectoryFactory::getDirectory(_fs, 
index_dir.c_str()),
                     index_file_name.c_str(), 
config::inverted_index_read_buffer_size);
             owned_dir = true;
         }
@@ -734,8 +734,8 @@ BkdIndexReader::BkdIndexReader(io::FileSystemSPtr fs, const 
std::string& path,
     }
     _file_full_path = index_file;
     _compoundReader = std::make_unique<DorisCompoundReader>(
-            DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()), 
index_file_name.c_str(),
-            config::inverted_index_read_buffer_size);
+            DorisCompoundDirectoryFactory::getDirectory(fs, index_dir.c_str()),
+            index_file_name.c_str(), config::inverted_index_read_buffer_size);
 }
 
 Status BkdIndexReader::new_iterator(OlapReaderStatistics* stats, RuntimeState* 
runtime_state,
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
index 744710d9082..d82f6f6b556 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
@@ -64,7 +64,6 @@ const int32_t MERGE_FACTOR = 100000000;
 const int32_t MAX_LEAF_COUNT = 1024;
 const float MAXMBSortInHeap = 512.0 * 8;
 const int DIMS = 1;
-const std::string empty_value;
 
 template <FieldType field_type>
 class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
@@ -162,7 +161,10 @@ public:
         }
 
         _doc = std::make_unique<lucene::document::Document>();
-        _dir.reset(DorisCompoundDirectory::getDirectory(_fs, 
index_path.c_str(), true));
+        bool use_compound_file_writer = true;
+        bool can_use_ram_dir = true;
+        _dir.reset(DorisCompoundDirectoryFactory::getDirectory(
+                _fs, index_path.c_str(), use_compound_file_writer, 
can_use_ram_dir));
 
         if (_parser_type == InvertedIndexParserType::PARSER_STANDARD ||
             _parser_type == InvertedIndexParserType::PARSER_UNICODE) {
@@ -296,7 +298,7 @@ public:
                     
get_parser_ignore_above_value_from_properties(_index_meta->properties());
             auto ignore_above = std::stoi(ignore_above_value);
             for (int i = 0; i < count; ++i) {
-                // only ignore_above UNTOKENIZED strings
+                // only ignore_above UNTOKENIZED strings and empty strings not 
tokenized
                 if ((_parser_type == InvertedIndexParserType::PARSER_NONE &&
                      v->get_size() > ignore_above) ||
                     (_parser_type != InvertedIndexParserType::PARSER_NONE && 
v->empty())) {
@@ -346,7 +348,7 @@ public:
                 }
 
                 auto value = join(strings, " ");
-                // only ignore_above UNTOKENIZED strings
+                // only ignore_above UNTOKENIZED strings and empty strings not 
tokenized
                 if ((_parser_type == InvertedIndexParserType::PARSER_NONE &&
                      value.length() > ignore_above) ||
                     (_parser_type != InvertedIndexParserType::PARSER_NONE && 
value.empty())) {
@@ -493,7 +495,10 @@ public:
             if constexpr (field_is_numeric_type(field_type)) {
                 auto index_path = 
InvertedIndexDescriptor::get_temporary_index_path(
                         _directory + "/" + _segment_file_name, 
_index_meta->index_id());
-                dir = DorisCompoundDirectory::getDirectory(_fs, 
index_path.c_str(), true);
+                bool use_compound_file_writer = true;
+                bool can_use_ram_dir = true;
+                dir = DorisCompoundDirectoryFactory::getDirectory(
+                        _fs, index_path.c_str(), use_compound_file_writer, 
can_use_ram_dir);
                 write_null_bitmap(null_bitmap_out, dir);
                 _bkd_writer->max_doc_ = _rid;
                 _bkd_writer->docs_seen_ = _row_ids_seen_for_bkd;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to