This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 0186f7313b2 [feature](inverted index)write separated index files in RAM directory to reduce IO #28810 (#29305) 0186f7313b2 is described below commit 0186f7313b249a842f3df7ac685ce0bbd16b2db1 Author: qiye <jianliang5...@gmail.com> AuthorDate: Sat Dec 30 09:08:22 2023 +0800 [feature](inverted index)write separated index files in RAM directory to reduce IO #28810 (#29305) --- be/src/common/config.cpp | 2 + be/src/common/config.h | 2 + be/src/index-tools/index_tool.cpp | 8 +- be/src/olap/compaction.cpp | 3 +- .../rowset/segment_v2/inverted_index_cache.cpp | 8 +- .../segment_v2/inverted_index_compaction.cpp | 18 +- .../inverted_index_compound_directory.cpp | 325 +++++++++++++++------ .../segment_v2/inverted_index_compound_directory.h | 131 ++++++--- .../rowset/segment_v2/inverted_index_reader.cpp | 6 +- .../rowset/segment_v2/inverted_index_writer.cpp | 15 +- 10 files changed, 369 insertions(+), 149 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index af08d2deaab..a762d0de392 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1007,6 +1007,8 @@ DEFINE_Int32(inverted_index_read_buffer_size, "4096"); DEFINE_Int32(max_depth_in_bkd_tree, "32"); // index compaction DEFINE_mBool(inverted_index_compaction_enable, "false"); +// index by RAM directory +DEFINE_mBool(inverted_index_ram_dir_enable, "false"); // use num_broadcast_buffer blocks as buffer to do broadcast DEFINE_Int32(num_broadcast_buffer, "32"); // semi-structure configs diff --git a/be/src/common/config.h b/be/src/common/config.h index 4fc3bc8dbfa..f71aad99ab3 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1042,6 +1042,8 @@ DECLARE_mInt32(inverted_index_max_buffered_docs); DECLARE_Int32(max_depth_in_bkd_tree); // index compaction DECLARE_mBool(inverted_index_compaction_enable); +// index by RAM directory +DECLARE_mBool(inverted_index_ram_dir_enable); // use num_broadcast_buffer blocks as buffer to do broadcast DECLARE_Int32(num_broadcast_buffer); // semi-structure configs diff --git a/be/src/index-tools/index_tool.cpp b/be/src/index-tools/index_tool.cpp index 9cd72795ef9..a5ecf9b996f 100644 --- a/be/src/index-tools/index_tool.cpp +++ b/be/src/index-tools/index_tool.cpp @@ -30,7 +30,7 @@ #include "olap/rowset/segment_v2/inverted_index_compound_reader.h" using doris::segment_v2::DorisCompoundReader; -using doris::segment_v2::DorisCompoundDirectory; +using doris::segment_v2::DorisCompoundDirectoryFactory; using doris::io::FileInfo; using namespace lucene::analysis; using namespace lucene::index; @@ -150,7 +150,7 @@ int main(int argc, char** argv) { auto fs = doris::io::global_local_filesystem(); try { lucene::store::Directory* dir = - DorisCompoundDirectory::getDirectory(fs, dir_str.c_str()); + DorisCompoundDirectoryFactory::getDirectory(fs, dir_str.c_str()); auto reader = new DorisCompoundReader(dir, file_str.c_str(), 4096); std::vector<std::string> files; std::cout << "Nested files for " << file_str << std::endl; @@ -173,7 +173,7 @@ int main(int argc, char** argv) { auto fs = doris::io::global_local_filesystem(); try { lucene::store::Directory* dir = - DorisCompoundDirectory::getDirectory(fs, dir_str.c_str()); + DorisCompoundDirectoryFactory::getDirectory(fs, dir_str.c_str()); auto reader = new DorisCompoundReader(dir, file_str.c_str(), 4096); std::cout << "Term statistics for " << file_str << std::endl; std::cout << "==================================" << std::endl; @@ -190,7 +190,7 @@ int main(int argc, char** argv) { auto fs = doris::io::global_local_filesystem(); try { lucene::store::Directory* dir = - DorisCompoundDirectory::getDirectory(fs, FLAGS_directory.c_str()); + DorisCompoundDirectoryFactory::getDirectory(fs, FLAGS_directory.c_str()); if (FLAGS_idx_file_name == "") { //try to search from directory's all files std::vector<FileInfo> files; diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index f6c8b3bb5bf..2cbd354b6de 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -633,7 +633,8 @@ Status Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool std::string dir_str = p.parent_path().string(); std::string file_str = p.filename().string(); lucene::store::Directory* dir = - DorisCompoundDirectory::getDirectory(fs, dir_str.c_str()); + DorisCompoundDirectoryFactory::getDirectory( + fs, dir_str.c_str()); DorisCompoundReader reader(dir, file_str.c_str()); std::vector<std::string> files; reader.list(&files); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp index f3c68984ebb..0804fcca2b2 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_cache.cpp @@ -43,9 +43,9 @@ InvertedIndexSearcherCache* InvertedIndexSearcherCache::_s_instance = nullptr; IndexSearcherPtr InvertedIndexSearcherCache::build_index_searcher(const io::FileSystemSPtr& fs, const std::string& index_dir, const std::string& file_name) { - DorisCompoundReader* directory = - new DorisCompoundReader(DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()), - file_name.c_str(), config::inverted_index_read_buffer_size); + DorisCompoundReader* directory = new DorisCompoundReader( + DorisCompoundDirectoryFactory::getDirectory(fs, index_dir.c_str()), file_name.c_str(), + config::inverted_index_read_buffer_size); auto closeDirectory = true; auto index_searcher = std::make_shared<lucene::search::IndexSearcher>(directory, closeDirectory); @@ -190,7 +190,7 @@ int64_t InvertedIndexSearcherCache::mem_consumption() { bool InvertedIndexSearcherCache::_lookup(const InvertedIndexSearcherCache::CacheKey& key, InvertedIndexCacheHandle* handle) { - auto lru_handle = _cache->lookup(key.index_file_path); + auto* lru_handle = _cache->lookup(key.index_file_path); if (lru_handle == nullptr) { return false; } diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp index b3a28c6ebfc..8458de9e7e3 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp @@ -22,8 +22,7 @@ #include "inverted_index_compound_directory.h" #include "inverted_index_compound_reader.h" -namespace doris { -namespace segment_v2 { +namespace doris::segment_v2 { Status compact_column(int32_t index_id, int src_segment_num, int dest_segment_num, std::vector<std::string> src_index_files, std::vector<std::string> dest_index_files, const io::FileSystemSPtr& fs, @@ -31,7 +30,7 @@ Status compact_column(int32_t index_id, int src_segment_num, int dest_segment_nu std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec, std::vector<uint32_t> dest_segment_num_rows) { lucene::store::Directory* dir = - DorisCompoundDirectory::getDirectory(fs, index_writer_path.c_str(), false); + DorisCompoundDirectoryFactory::getDirectory(fs, index_writer_path.c_str()); lucene::analysis::SimpleAnalyzer<char> analyzer; auto index_writer = _CLNEW lucene::index::IndexWriter(dir, &analyzer, true /* create */, true /* closeDirOnShutdown */); @@ -42,8 +41,8 @@ Status compact_column(int32_t index_id, int src_segment_num, int dest_segment_nu // format: rowsetId_segmentId_indexId.idx std::string src_idx_full_name = src_index_files[i] + "_" + std::to_string(index_id) + ".idx"; - DorisCompoundReader* reader = new DorisCompoundReader( - DorisCompoundDirectory::getDirectory(fs, tablet_path.c_str()), + auto* reader = new DorisCompoundReader( + DorisCompoundDirectoryFactory::getDirectory(fs, tablet_path.c_str()), src_idx_full_name.c_str()); src_index_dirs[i] = reader; } @@ -53,7 +52,7 @@ Status compact_column(int32_t index_id, int src_segment_num, int dest_segment_nu for (int i = 0; i < dest_segment_num; ++i) { // format: rowsetId_segmentId_columnId auto path = tablet_path + "/" + dest_index_files[i] + "_" + std::to_string(index_id); - dest_index_dirs[i] = DorisCompoundDirectory::getDirectory(fs, path.c_str(), true); + dest_index_dirs[i] = DorisCompoundDirectoryFactory::getDirectory(fs, path.c_str(), true); } DCHECK_EQ(src_index_dirs.size(), trans_vec.size()); @@ -66,13 +65,13 @@ Status compact_column(int32_t index_id, int src_segment_num, int dest_segment_nu // when index_writer is destroyed, if closeDir is set, dir will be close // _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir will be destroyed. _CLDECDELETE(dir) - for (auto d : src_index_dirs) { + for (auto* d : src_index_dirs) { if (d != nullptr) { d->close(); _CLDELETE(d); } } - for (auto d : dest_index_dirs) { + for (auto* d : dest_index_dirs) { if (d != nullptr) { // NOTE: DO NOT close dest dir here, because it will be closed when dest index writer finalize. //d->close(); @@ -84,5 +83,4 @@ Status compact_column(int32_t index_id, int src_segment_num, int dest_segment_nu fs->delete_directory(index_writer_path.c_str()); return Status::OK(); } -} // namespace segment_v2 -} // namespace doris +} // namespace doris::segment_v2 diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp index cba340a26f4..0796102e908 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp @@ -18,10 +18,10 @@ #include "olap/rowset/segment_v2/inverted_index_compound_directory.h" #include "CLucene/SharedHeader.h" +#include "CLucene/_SharedHeader.h" #include "common/status.h" #include "io/fs/file_reader.h" #include "io/fs/file_writer.h" -#include "io/fs/path.h" #include "util/debug_points.h" #include "util/slice.h" @@ -47,17 +47,14 @@ #include <CLucene/store/RAMDirectory.h> #include <CLucene/util/Misc.h> #include <assert.h> -// IWYU pragma: no_include <bthread/errno.h> #include <errno.h> // IWYU pragma: keep #include <glog/logging.h> #include <stdio.h> #include <string.h> #include <wchar.h> -#include <algorithm> #include <filesystem> #include <iostream> -#include <memory> #include <mutex> #include <utility> @@ -76,15 +73,12 @@ LOG(WARNING) << err; \ _CLTHROWA(CL_ERR_IO, err.c_str()); \ } -namespace doris { -namespace segment_v2 { +namespace doris::segment_v2 { const char* WRITE_LOCK_FILE = "write.lock"; const char* COMPOUND_FILE_EXTENSION = ".idx"; const int64_t MAX_HEADER_DATA_SIZE = 1024 * 128; // 128k -bool DorisCompoundDirectory::disableLocks = false; - DorisCompoundFileWriter::DorisCompoundFileWriter(CL_NS(store)::Directory* dir) { if (dir == nullptr) { _CLTHROWA(CL_ERR_NullPointer, "directory cannot be null"); @@ -123,7 +117,7 @@ void DorisCompoundFileWriter::writeCompoundFile() { std::string idx_name = std::string(cfs_path.stem().c_str()) + COMPOUND_FILE_EXTENSION; // write file entries to ram directory to get header length lucene::store::RAMDirectory ram_dir; - auto out_idx = ram_dir.createOutput(idx_name.c_str()); + auto* out_idx = ram_dir.createOutput(idx_name.c_str()); if (out_idx == nullptr) { LOG(WARNING) << "Write compound file error: RAMDirectory output is nullptr."; _CLTHROWA(CL_ERR_IO, "Create RAMDirectory output error"); @@ -153,9 +147,9 @@ void DorisCompoundFileWriter::writeCompoundFile() { ram_dir.close(); auto compound_fs = ((DorisCompoundDirectory*)directory)->getCompoundFileSystem(); - auto out_dir = DorisCompoundDirectory::getDirectory(compound_fs, idx_path.c_str(), false); + auto* out_dir = DorisCompoundDirectoryFactory::getDirectory(compound_fs, idx_path.c_str()); - auto out = out_dir->createOutput(idx_name.c_str()); + auto* out = out_dir->createOutput(idx_name.c_str()); if (out == nullptr) { LOG(WARNING) << "Write compound file error: CompoundDirectory output is nullptr."; _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error"); @@ -261,7 +255,7 @@ bool DorisCompoundDirectory::FSIndexInput::open(const io::FileSystemSPtr& fs, co if (buffer_size == -1) { buffer_size = CL_NS(store)::BufferedIndexOutput::BUFFER_SIZE; } - SharedHandle* h = _CLNEW SharedHandle(path); + auto* h = _CLNEW SharedHandle(path); if (!fs->open_file(path, &h->_reader).ok()) { error.set(CL_ERR_IO, "open file error"); @@ -298,7 +292,7 @@ DorisCompoundDirectory::FSIndexInput::FSIndexInput(const FSIndexInput& other) _CLTHROWA(CL_ERR_NullPointer, "other handle is null"); } - std::lock_guard<doris::Mutex> wlock(*other._handle->_shared_lock); + std::lock_guard<std::mutex> wlock(*other._handle->_shared_lock); _handle = _CL_POINTER(other._handle); _pos = other._handle->_fpos; //note where we are currently... } @@ -307,7 +301,7 @@ DorisCompoundDirectory::FSIndexInput::SharedHandle::SharedHandle(const char* pat _length = 0; _fpos = 0; strcpy(this->path, path); - _shared_lock = new doris::Mutex(); + _shared_lock = new std::mutex(); } DorisCompoundDirectory::FSIndexInput::SharedHandle::~SharedHandle() { @@ -328,10 +322,10 @@ lucene::store::IndexInput* DorisCompoundDirectory::FSIndexInput::clone() const { void DorisCompoundDirectory::FSIndexInput::close() { BufferedIndexInput::close(); if (_handle != nullptr) { - doris::Mutex* lock = _handle->_shared_lock; + std::mutex* lock = _handle->_shared_lock; bool ref = false; { - std::lock_guard<doris::Mutex> wlock(*lock); + std::lock_guard<std::mutex> wlock(*lock); //determine if we are about to delete the handle... ref = (_LUCENE_ATOMIC_INT_GET(_handle->__cl_refcount) > 1); //decdelete (deletes if refcount is down to 0 @@ -354,7 +348,7 @@ void DorisCompoundDirectory::FSIndexInput::seekInternal(const int64_t position) void DorisCompoundDirectory::FSIndexInput::readInternal(uint8_t* b, const int32_t len) { CND_PRECONDITION(_handle != nullptr, "shared file handle has closed"); CND_PRECONDITION(_handle->_reader != nullptr, "file is not open"); - std::lock_guard<doris::Mutex> wlock(*_handle->_shared_lock); + std::lock_guard<std::mutex> wlock(*_handle->_shared_lock); int64_t position = getFilePointer(); if (_pos != position) { @@ -498,10 +492,12 @@ DorisCompoundDirectory::DorisCompoundDirectory() { } void DorisCompoundDirectory::init(const io::FileSystemSPtr& _fs, const char* _path, + bool use_compound_file_writer, lucene::store::LockFactory* lock_factory, const io::FileSystemSPtr& cfs, const char* cfs_path) { fs = _fs; directory = _path; + useCompoundFileWriter = use_compound_file_writer; if (cfs == nullptr) { compound_fs = fs; @@ -513,17 +509,12 @@ void DorisCompoundDirectory::init(const io::FileSystemSPtr& _fs, const char* _pa } else { cfs_directory = _path; } - bool doClearLockID = false; if (lock_factory == nullptr) { lock_factory = _CLNEW lucene::store::NoLockFactory(); } - setLockFactory(lock_factory); - - if (doClearLockID) { - lockFactory->setLockPrefix(nullptr); - } + lucene::store::Directory::setLockFactory(lock_factory); // It's fail checking directory existence in S3. if (fs->type() == io::FileSystemType::S3) { @@ -538,24 +529,6 @@ void DorisCompoundDirectory::init(const io::FileSystemSPtr& _fs, const char* _pa } } -void DorisCompoundDirectory::create() { - std::lock_guard<doris::Mutex> wlock(_this_lock); - - //clear old files - std::vector<std::string> files; - lucene::util::Misc::listFiles(directory.c_str(), files, false); - std::vector<std::string>::iterator itr = files.begin(); - while (itr != files.end()) { - if (CL_NS(index)::IndexReader::isLuceneFile(itr->c_str())) { - if (unlink((directory + PATH_DELIMITERA + *itr).c_str()) == -1) { - _CLTHROWA(CL_ERR_IO, "Couldn't delete file "); - } - } - itr++; - } - lockFactory->clearLock(CL_NS(index)::IndexWriter::WRITE_LOCK_NAME); -} - void DorisCompoundDirectory::priv_getFN(char* buffer, const char* name) const { buffer[0] = 0; strcpy(buffer, directory.c_str()); @@ -598,45 +571,6 @@ const char* DorisCompoundDirectory::getCfsDirName() const { return cfs_directory.c_str(); } -DorisCompoundDirectory* DorisCompoundDirectory::getDirectory(const io::FileSystemSPtr& fs, - const char* file, - bool use_compound_file_writer, - const io::FileSystemSPtr& cfs_fs, - const char* cfs_file) { - DorisCompoundDirectory* dir = - getDirectory(fs, file, (lucene::store::LockFactory*)nullptr, cfs_fs, cfs_file); - dir->useCompoundFileWriter = use_compound_file_writer; - return dir; -} - -//static -DorisCompoundDirectory* DorisCompoundDirectory::getDirectory( - const io::FileSystemSPtr& _fs, const char* _file, lucene::store::LockFactory* lock_factory, - const io::FileSystemSPtr& _cfs, const char* _cfs_file) { - const char* cfs_file = _cfs_file; - if (cfs_file == nullptr) { - cfs_file = _file; - } - DorisCompoundDirectory* dir = nullptr; - if (!_file || !*_file) { - _CLTHROWA(CL_ERR_IO, "Invalid directory"); - } - - const char* file = _file; - - bool exists = false; - LOG_AND_THROW_IF_ERROR(_fs->exists(file, &exists), "Get directory exists IO error") - if (!exists) { - LOG_AND_THROW_IF_ERROR(_fs->create_directory(file), - "Get directory create directory IO error") - } - - dir = _CLNEW DorisCompoundDirectory(); - dir->init(_fs, file, lock_factory, _cfs, cfs_file); - - return dir; -} - int64_t DorisCompoundDirectory::fileModified(const char* name) const { CND_PRECONDITION(directory[0] != 0, "directory is not open"); struct stat buf; @@ -677,7 +611,7 @@ bool DorisCompoundDirectory::openInput(const char* name, lucene::store::IndexInp void DorisCompoundDirectory::close() { if (useCompoundFileWriter) { - DorisCompoundFileWriter* cfsWriter = _CLNEW DorisCompoundFileWriter(this); + auto* cfsWriter = _CLNEW DorisCompoundFileWriter(this); // write compound file cfsWriter->writeCompoundFile(); // delete index path, which contains separated inverted index files @@ -705,7 +639,7 @@ bool DorisCompoundDirectory::deleteDirectory() { void DorisCompoundDirectory::renameFile(const char* from, const char* to) { CND_PRECONDITION(directory[0] != 0, "directory is not open"); - std::lock_guard<doris::Mutex> wlock(_this_lock); + std::lock_guard<std::mutex> wlock(_this_lock); char old[CL_MAX_DIR]; priv_getFN(old, from); @@ -733,7 +667,7 @@ lucene::store::IndexOutput* DorisCompoundDirectory::createOutput(const char* nam LOG_AND_THROW_IF_ERROR(fs->exists(fl, &exists), "Create output file exists IO error") assert(!exists); } - auto ret = _CLNEW FSIndexOutput(); + auto* ret = _CLNEW FSIndexOutput(); try { ret->init(fs, fl); } catch (CLuceneError& err) { @@ -747,5 +681,226 @@ std::string DorisCompoundDirectory::toString() const { return std::string("DorisCompoundDirectory@") + this->directory; } -} // namespace segment_v2 -} // namespace doris +/** + * DorisRAMCompoundDirectory + */ +DorisRAMCompoundDirectory::DorisRAMCompoundDirectory() { + filesMap = _CLNEW FileMap(true, true); + this->sizeInBytes = 0; +} + +DorisRAMCompoundDirectory::~DorisRAMCompoundDirectory() { + _CLDELETE(lockFactory); + _CLDELETE(filesMap); +} + +void DorisRAMCompoundDirectory::init(const io::FileSystemSPtr& _fs, const char* _path, + bool use_compound_file_writer, + lucene::store::LockFactory* lock_factory, + const io::FileSystemSPtr& cfs, const char* cfs_path) { + fs = _fs; + directory = _path; + useCompoundFileWriter = use_compound_file_writer; + + if (cfs == nullptr) { + compound_fs = fs; + } else { + compound_fs = cfs; + } + if (cfs_path != nullptr) { + cfs_directory = cfs_path; + } else { + cfs_directory = _path; + } + + lucene::store::Directory::setLockFactory(_CLNEW lucene::store::SingleInstanceLockFactory()); +} + +bool DorisRAMCompoundDirectory::list(std::vector<std::string>* names) const { + std::lock_guard<std::mutex> wlock(_this_lock); + auto itr = filesMap->begin(); + while (itr != filesMap->end()) { + names->emplace_back(itr->first); + ++itr; + } + return true; +} + +bool DorisRAMCompoundDirectory::fileExists(const char* name) const { + std::lock_guard<std::mutex> wlock(_this_lock); + return filesMap->exists((char*)name); +} + +int64_t DorisRAMCompoundDirectory::fileModified(const char* name) const { + std::lock_guard<std::mutex> wlock(_this_lock); + auto* f = filesMap->get((char*)name); + return f->getLastModified(); +} + +void DorisRAMCompoundDirectory::touchFile(const char* name) { + lucene::store::RAMFile* file = nullptr; + { + std::lock_guard<std::mutex> wlock(_this_lock); + file = filesMap->get((char*)name); + } + const uint64_t ts1 = file->getLastModified(); + uint64_t ts2 = lucene::util::Misc::currentTimeMillis(); + + //make sure that the time has actually changed + while (ts1 == ts2) { + _LUCENE_SLEEP(1); + ts2 = lucene::util::Misc::currentTimeMillis(); + }; + + file->setLastModified(ts2); +} + +int64_t DorisRAMCompoundDirectory::fileLength(const char* name) const { + std::lock_guard<std::mutex> wlock(_this_lock); + auto* f = filesMap->get((char*)name); + return f->getLength(); +} + +bool DorisRAMCompoundDirectory::openInput(const char* name, lucene::store::IndexInput*& ret, + CLuceneError& error, int32_t bufferSize) { + std::lock_guard<std::mutex> wlock(_this_lock); + auto* file = filesMap->get((char*)name); + if (file == nullptr) { + error.set(CL_ERR_IO, + "[DorisRAMCompoundDirectory::open] The requested file does not exist."); + return false; + } + ret = _CLNEW lucene::store::RAMInputStream(file); + return true; +} + +void DorisRAMCompoundDirectory::close() { + // write compound file + DorisCompoundDirectory::close(); + + std::lock_guard<std::mutex> wlock(_this_lock); + filesMap->clear(); + _CLDELETE(filesMap); +} + +bool DorisRAMCompoundDirectory::doDeleteFile(const char* name) { + std::lock_guard<std::mutex> wlock(_this_lock); + auto itr = filesMap->find((char*)name); + if (itr != filesMap->end()) { + SCOPED_LOCK_MUTEX(this->THIS_LOCK); + sizeInBytes -= itr->second->sizeInBytes; + filesMap->removeitr(itr); + return true; + } else { + return false; + } +} + +bool DorisRAMCompoundDirectory::deleteDirectory() { + // do nothing, RAM dir do not have actual files + return true; +} + +void DorisRAMCompoundDirectory::renameFile(const char* from, const char* to) { + std::lock_guard<std::mutex> wlock(_this_lock); + auto itr = filesMap->find((char*)from); + + /* DSR:CL_BUG_LEAK: + ** If a file named $to already existed, its old value was leaked. + ** My inclination would be to prevent this implicit deletion with an + ** exception, but it happens routinely in CLucene's internals (e.g., during + ** IndexWriter.addIndexes with the file named 'segments'). */ + if (filesMap->exists((char*)to)) { + auto itr1 = filesMap->find((char*)to); + SCOPED_LOCK_MUTEX(this->THIS_LOCK); + sizeInBytes -= itr1->second->sizeInBytes; + filesMap->removeitr(itr1); + } + if (itr == filesMap->end()) { + char tmp[1024]; + snprintf(tmp, 1024, "cannot rename %s, file does not exist", from); + _CLTHROWT(CL_ERR_IO, tmp); + } + DCHECK(itr != filesMap->end()); + auto* file = itr->second; + filesMap->removeitr(itr, false, true); + filesMap->put(strdup(to), file); +} + +lucene::store::IndexOutput* DorisRAMCompoundDirectory::createOutput(const char* name) { + /* Check the $filesMap VoidMap to see if there was a previous file named + ** $name. If so, delete the old RAMFile object, but reuse the existing + ** char buffer ($n) that holds the filename. If not, duplicate the + ** supplied filename buffer ($name) and pass ownership of that memory ($n) + ** to $files. */ + std::lock_guard<std::mutex> wlock(_this_lock); + + // get the actual pointer to the output name + char* n = nullptr; + auto itr = filesMap->find(const_cast<char*>(name)); + if (itr != filesMap->end()) { + n = itr->first; + lucene::store::RAMFile* rf = itr->second; + SCOPED_LOCK_MUTEX(this->THIS_LOCK); + sizeInBytes -= rf->sizeInBytes; + _CLDELETE(rf); + } else { + n = STRDUP_AtoA(name); + } + + auto* file = _CLNEW lucene::store::RAMFile(); + (*filesMap)[n] = file; + + return _CLNEW lucene::store::RAMOutputStream(file); +} + +std::string DorisRAMCompoundDirectory::toString() const { + return std::string("DorisRAMCompoundDirectory@") + this->directory; +} + +const char* DorisRAMCompoundDirectory::getClassName() { + return "DorisRAMCompoundDirectory"; +} + +const char* DorisRAMCompoundDirectory::getObjectName() const { + return getClassName(); +} + +/** + * DorisCompoundDirectoryFactory + */ +DorisCompoundDirectory* DorisCompoundDirectoryFactory::getDirectory( + const io::FileSystemSPtr& _fs, const char* _file, bool use_compound_file_writer, + bool can_use_ram_dir, lucene::store::LockFactory* lock_factory, + const io::FileSystemSPtr& _cfs, const char* _cfs_file) { + const char* cfs_file = _cfs_file; + if (cfs_file == nullptr) { + cfs_file = _file; + } + DorisCompoundDirectory* dir = nullptr; + if (!_file || !*_file) { + _CLTHROWA(CL_ERR_IO, "Invalid directory"); + } + + const char* file = _file; + + // Write by RAM directory + // 1. only write separated index files, which is can_use_ram_dir = true. + // 2. config::inverted_index_ram_dir_enable = true + if (config::inverted_index_ram_dir_enable && can_use_ram_dir) { + dir = _CLNEW DorisRAMCompoundDirectory(); + } else { + bool exists = false; + LOG_AND_THROW_IF_ERROR(_fs->exists(file, &exists), "Get directory exists IO error") + if (!exists) { + LOG_AND_THROW_IF_ERROR(_fs->create_directory(file), + "Get directory create directory IO error") + } + dir = _CLNEW DorisCompoundDirectory(); + } + dir->init(_fs, file, use_compound_file_writer, lock_factory, _cfs, cfs_file); + + return dir; +} + +} // namespace doris::segment_v2 diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h index b06f980f6bd..7ae0e618a45 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h @@ -21,6 +21,7 @@ #include <CLucene/store/Directory.h> #include <CLucene/store/IndexInput.h> #include <CLucene/store/IndexOutput.h> +#include <CLucene/store/_RAMDirectory.h> #include <stdint.h> #include <string> @@ -29,19 +30,15 @@ #include "CLucene/SharedHeader.h" #include "io/fs/file_reader_writer_fwd.h" #include "io/fs/file_system.h" -#include "util/lock.h" +#include "io/io_common.h" class CLuceneError; -namespace lucene { -namespace store { +namespace lucene::store { class LockFactory; -} // namespace store -} // namespace lucene +} // namespace lucene::store -namespace doris { - -namespace segment_v2 { +namespace doris::segment_v2 { class DorisCompoundFileWriter : LUCENE_BASE { public: @@ -61,26 +58,15 @@ class CLUCENE_EXPORT DorisCompoundDirectory : public lucene::store::Directory { private: int filemode; - doris::Mutex _this_lock; - protected: - DorisCompoundDirectory(); - virtual void init(const io::FileSystemSPtr& fs, const char* path, - lucene::store::LockFactory* lock_factory = nullptr, - const io::FileSystemSPtr& compound_fs = nullptr, - const char* cfs_path = nullptr); - void priv_getFN(char* buffer, const char* name) const; - -private: + mutable std::mutex _this_lock; io::FileSystemSPtr fs; io::FileSystemSPtr compound_fs; std::string directory; std::string cfs_directory; - void create(); - static bool disableLocks; bool useCompoundFileWriter {false}; -protected: + void priv_getFN(char* buffer, const char* name) const; /// Removes an existing file in the directory. bool doDeleteFile(const char* name) override; @@ -98,16 +84,6 @@ public: bool list(std::vector<std::string>* names) const override; bool fileExists(const char* name) const override; const char* getCfsDirName() const; - static DorisCompoundDirectory* getDirectory(const io::FileSystemSPtr& fs, const char* file, - lucene::store::LockFactory* lock_factory = nullptr, - const io::FileSystemSPtr& cfs_fs = nullptr, - const char* cfs_file = nullptr); - - static DorisCompoundDirectory* getDirectory(const io::FileSystemSPtr& fs, const char* file, - bool use_compound_file_writer, - const io::FileSystemSPtr& cfs_fs = nullptr, - const char* cfs_file = nullptr); - int64_t fileModified(const char* name) const override; int64_t fileLength(const char* name) const override; bool openInput(const char* name, lucene::store::IndexInput*& ret, CLuceneError& err, @@ -119,7 +95,77 @@ public: std::string toString() const override; static const char* getClassName(); const char* getObjectName() const override; - bool deleteDirectory(); + virtual bool deleteDirectory(); + + DorisCompoundDirectory(); + + virtual void init(const io::FileSystemSPtr& fs, const char* path, bool use_compound_file_writer, + lucene::store::LockFactory* lock_factory = nullptr, + const io::FileSystemSPtr& compound_fs = nullptr, + const char* cfs_path = nullptr); +}; + +class CLUCENE_EXPORT DorisRAMCompoundDirectory : public DorisCompoundDirectory { +protected: + using FileMap = + lucene::util::CLHashMap<char*, lucene::store::RAMFile*, lucene::util::Compare::Char, + lucene::util::Equals::Char, lucene::util::Deletor::acArray, + lucene::util::Deletor::Object<lucene::store::RAMFile>>; + + // unlike the java Hashtable, FileMap is not synchronized, and all access must be protected by a lock + FileMap* filesMap; + void init(const io::FileSystemSPtr& fs, const char* path, bool use_compound_file_writer, + lucene::store::LockFactory* lock_factory = nullptr, + const io::FileSystemSPtr& compound_fs = nullptr, + const char* cfs_path = nullptr) override; + +public: + int64_t sizeInBytes; + + /// Returns a null terminated array of strings, one for each file in the directory. + bool list(std::vector<std::string>* names) const override; + + /** Constructs an empty {@link Directory}. */ + DorisRAMCompoundDirectory(); + + ///Destructor - only call this if you are sure the directory + ///is not being used anymore. Otherwise use the ref-counting + ///facilities of dir->close + ~DorisRAMCompoundDirectory() override; + + bool doDeleteFile(const char* name) override; + + bool deleteDirectory() override; + + /// Returns true iff the named file exists in this directory. + bool fileExists(const char* name) const override; + + /// Returns the time the named file was last modified. + int64_t fileModified(const char* name) const override; + + /// Returns the length in bytes of a file in the directory. + int64_t fileLength(const char* name) const override; + + /// Removes an existing file in the directory. + void renameFile(const char* from, const char* to) override; + + /** Set the modified time of an existing file to now. */ + void touchFile(const char* name) override; + + /// Creates a new, empty file in the directory with the given name. + /// Returns a stream writing this file. + lucene::store::IndexOutput* createOutput(const char* name) override; + + /// Returns a stream reading an existing file. + bool openInput(const char* name, lucene::store::IndexInput*& ret, CLuceneError& error, + int32_t bufferSize = -1) override; + + void close() override; + + std::string toString() const override; + + static const char* getClassName(); + const char* getObjectName() const override; }; class DorisCompoundDirectory::FSIndexInput : public lucene::store::BufferedIndexInput { @@ -128,13 +174,13 @@ class DorisCompoundDirectory::FSIndexInput : public lucene::store::BufferedIndex io::FileReaderSPtr _reader; uint64_t _length; int64_t _fpos; - doris::Mutex* _shared_lock; + std::mutex* _shared_lock; char path[4096]; SharedHandle(const char* path); ~SharedHandle() override; }; - SharedHandle* _handle; + SharedHandle* _handle = nullptr; int64_t _pos; FSIndexInput(SharedHandle* handle, int32_t buffer_size) : BufferedIndexInput(buffer_size) { @@ -158,7 +204,7 @@ public: const char* getObjectName() const override { return getClassName(); } static const char* getClassName() { return "FSIndexInput"; } - doris::Mutex _this_lock; + std::mutex _this_lock; protected: // Random-access methods @@ -167,5 +213,16 @@ protected: void readInternal(uint8_t* b, const int32_t len) override; }; -} // namespace segment_v2 -} // namespace doris +/** + * Factory function to create DorisCompoundDirectory + */ +class DorisCompoundDirectoryFactory { +public: + static DorisCompoundDirectory* getDirectory(const io::FileSystemSPtr& fs, const char* file, + bool use_compound_file_writer = false, + bool can_use_ram_dir = false, + lucene::store::LockFactory* lock_factory = nullptr, + const io::FileSystemSPtr& cfs_fs = nullptr, + const char* cfs_file = nullptr); +}; +} // namespace doris::segment_v2 \ No newline at end of file diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp index 6e1ebb53a9d..7934c63286d 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp @@ -197,7 +197,7 @@ Status InvertedIndexReader::read_null_bitmap(InvertedIndexQueryCacheHandle* cach if (!dir) { dir = new DorisCompoundReader( - DorisCompoundDirectory::getDirectory(_fs, index_dir.c_str()), + DorisCompoundDirectoryFactory::getDirectory(_fs, index_dir.c_str()), index_file_name.c_str(), config::inverted_index_read_buffer_size); owned_dir = true; } @@ -734,8 +734,8 @@ BkdIndexReader::BkdIndexReader(io::FileSystemSPtr fs, const std::string& path, } _file_full_path = index_file; _compoundReader = std::make_unique<DorisCompoundReader>( - DorisCompoundDirectory::getDirectory(fs, index_dir.c_str()), index_file_name.c_str(), - config::inverted_index_read_buffer_size); + DorisCompoundDirectoryFactory::getDirectory(fs, index_dir.c_str()), + index_file_name.c_str(), config::inverted_index_read_buffer_size); } Status BkdIndexReader::new_iterator(OlapReaderStatistics* stats, RuntimeState* runtime_state, diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp index 744710d9082..d82f6f6b556 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp @@ -64,7 +64,6 @@ const int32_t MERGE_FACTOR = 100000000; const int32_t MAX_LEAF_COUNT = 1024; const float MAXMBSortInHeap = 512.0 * 8; const int DIMS = 1; -const std::string empty_value; template <FieldType field_type> class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { @@ -162,7 +161,10 @@ public: } _doc = std::make_unique<lucene::document::Document>(); - _dir.reset(DorisCompoundDirectory::getDirectory(_fs, index_path.c_str(), true)); + bool use_compound_file_writer = true; + bool can_use_ram_dir = true; + _dir.reset(DorisCompoundDirectoryFactory::getDirectory( + _fs, index_path.c_str(), use_compound_file_writer, can_use_ram_dir)); if (_parser_type == InvertedIndexParserType::PARSER_STANDARD || _parser_type == InvertedIndexParserType::PARSER_UNICODE) { @@ -296,7 +298,7 @@ public: get_parser_ignore_above_value_from_properties(_index_meta->properties()); auto ignore_above = std::stoi(ignore_above_value); for (int i = 0; i < count; ++i) { - // only ignore_above UNTOKENIZED strings + // only ignore_above UNTOKENIZED strings and empty strings not tokenized if ((_parser_type == InvertedIndexParserType::PARSER_NONE && v->get_size() > ignore_above) || (_parser_type != InvertedIndexParserType::PARSER_NONE && v->empty())) { @@ -346,7 +348,7 @@ public: } auto value = join(strings, " "); - // only ignore_above UNTOKENIZED strings + // only ignore_above UNTOKENIZED strings and empty strings not tokenized if ((_parser_type == InvertedIndexParserType::PARSER_NONE && value.length() > ignore_above) || (_parser_type != InvertedIndexParserType::PARSER_NONE && value.empty())) { @@ -493,7 +495,10 @@ public: if constexpr (field_is_numeric_type(field_type)) { auto index_path = InvertedIndexDescriptor::get_temporary_index_path( _directory + "/" + _segment_file_name, _index_meta->index_id()); - dir = DorisCompoundDirectory::getDirectory(_fs, index_path.c_str(), true); + bool use_compound_file_writer = true; + bool can_use_ram_dir = true; + dir = DorisCompoundDirectoryFactory::getDirectory( + _fs, index_path.c_str(), use_compound_file_writer, can_use_ram_dir); write_null_bitmap(null_bitmap_out, dir); _bkd_writer->max_doc_ = _rid; _bkd_writer->docs_seen_ = _row_ids_seen_for_bkd; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org