xiaokang commented on code in PR #30145: URL: https://github.com/apache/doris/pull/30145#discussion_r1498520653
########## be/src/olap/rowset/segment_v2/inverted_index_writer.cpp: ########## @@ -542,66 +515,79 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { } Status finish() override { - std::unique_ptr<lucene::store::IndexOutput> null_bitmap_out = nullptr; - std::unique_ptr<lucene::store::IndexOutput> data_out = nullptr; - std::unique_ptr<lucene::store::IndexOutput> index_out = nullptr; - std::unique_ptr<lucene::store::IndexOutput> meta_out = nullptr; - try { - // write bkd file - if constexpr (field_is_numeric_type(field_type)) { - _bkd_writer->max_doc_ = _rid; - _bkd_writer->docs_seen_ = _row_ids_seen_for_bkd; - null_bitmap_out = std::unique_ptr<lucene::store::IndexOutput>(_dir->createOutput( - InvertedIndexDescriptor::get_temporary_null_bitmap_file_name().c_str())); - data_out = std::unique_ptr<lucene::store::IndexOutput>(_dir->createOutput( - InvertedIndexDescriptor::get_temporary_bkd_index_data_file_name().c_str())); - meta_out = std::unique_ptr<lucene::store::IndexOutput>(_dir->createOutput( - InvertedIndexDescriptor::get_temporary_bkd_index_meta_file_name().c_str())); - index_out = std::unique_ptr<lucene::store::IndexOutput>(_dir->createOutput( - InvertedIndexDescriptor::get_temporary_bkd_index_file_name().c_str())); - write_null_bitmap(null_bitmap_out.get()); - - DBUG_EXECUTE_IF("InvertedIndexWriter._set_bkd_data_out_nullptr", - { data_out = nullptr; }); - if (data_out != nullptr && meta_out != nullptr && index_out != nullptr) { - _bkd_writer->meta_finish(meta_out.get(), - _bkd_writer->finish(data_out.get(), index_out.get()), - int(field_type)); - } else { - LOG(WARNING) << "Inverted index writer create output error occurred: nullptr"; - _CLTHROWA(CL_ERR_IO, "Create output error with nullptr"); + if (_dir != nullptr) { Review Comment: we can check _dir == nullptr and return Status::Error and leave the original code unchanged. ########## be/src/olap/rowset/segment_v2/inverted_index_writer.cpp: ########## @@ -111,22 +109,22 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { // open index searcher into cache auto mem_tracker = std::make_unique<MemTracker>("InvertedIndexSearcherCacheWithRead"); - io::Path index_dir(_directory); - auto index_file_name = InvertedIndexDescriptor::get_index_file_name( - _segment_file_name, _index_meta->index_id(), - _index_meta->get_index_suffix()); + auto index_file_path = _index_file_writer->get_index_file_path(_index_meta); Review Comment: Can it work for index format V1? ########## be/src/olap/rowset/segment_v2/inverted_index_writer.cpp: ########## @@ -616,21 +602,18 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { std::unique_ptr<lucene::analysis::Analyzer> _analyzer = nullptr; std::unique_ptr<lucene::util::Reader> _char_string_reader = nullptr; std::shared_ptr<lucene::util::bkd::bkd_writer> _bkd_writer = nullptr; - std::unique_ptr<DorisCompoundDirectory> _dir = nullptr; - std::string _segment_file_name; - std::string _directory; - io::FileSystemSPtr _fs; + DorisCompoundDirectory* _dir = nullptr; Review Comment: why not use unique_ptr? Is _dir owned and released by _index_file_writer? ########## be/src/olap/rowset/segment_v2/inverted_index_file_writer.h: ########## @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <CLucene.h> // IWYU pragma: keep +#include <CLucene/store/IndexInput.h> +#include <gen_cpp/olap_file.pb.h> + +#include <string> +#include <vector> + +#include "io/fs/file_system.h" +#include "olap/rowset/segment_v2/inverted_index_desc.h" + +namespace doris { +class TabletIndex; + +namespace segment_v2 { +class DorisCompoundDirectory; +using InvertedIndexDirectoryMap = + std::map<std::pair<int64_t, std::string>, std::unique_ptr<lucene::store::Directory>>; + +class DorisCompoundFileWriter : LUCENE_BASE { +public: + DorisCompoundFileWriter() = default; + DorisCompoundFileWriter(CL_NS(store)::Directory* dir); + ~DorisCompoundFileWriter() override = default; + /** Returns the directory of the compound file. */ + CL_NS(store)::Directory* getDirectory(); + virtual size_t writeCompoundFile(); + static void copyFile(const char* fileName, lucene::store::Directory* dir, + lucene::store::IndexOutput* output, uint8_t* buffer, int64_t bufferLength); + +private: + class FileInfo { + public: + std::string filename; + int32_t filesize; + }; + + void sort_files(std::vector<FileInfo>& file_infos); + + CL_NS(store)::Directory* directory = nullptr; +}; + +class InvertedIndexFileWriter { +public: + InvertedIndexFileWriter(const io::FileSystemSPtr& fs, io::Path segment_file_dir, + std::string segment_file_name, + InvertedIndexStorageFormatPB storage_format) + : _fs(fs), + _index_file_dir(std::move(segment_file_dir)), + _index_file_name(std::move(segment_file_name)), Review Comment: It's confusing to assing segment_file_name to _index_file_name ########## be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp: ########## @@ -0,0 +1,422 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/inverted_index_file_writer.h" + +#include "common/status.h" +#include "io/fs/file_writer.h" +#include "olap/rowset/segment_v2/inverted_index_compound_directory.h" +#include "olap/rowset/segment_v2/inverted_index_desc.h" +#include "olap/tablet_schema.h" + +namespace doris::segment_v2 { + +std::string InvertedIndexFileWriter::get_index_file_path(const TabletIndex* index_meta) const { + return InvertedIndexDescriptor::get_index_file_name(_index_file_dir / _index_file_name, + index_meta->index_id(), + index_meta->get_index_suffix()); +} + +Status InvertedIndexFileWriter::initialize(InvertedIndexDirectoryMap& indices_dirs) { + _indices_dirs = std::move(indices_dirs); + return Status::OK(); +} + +Result<DorisCompoundDirectory*> InvertedIndexFileWriter::open(const TabletIndex* index_meta) { + auto index_id = index_meta->index_id(); + auto index_suffix = index_meta->get_index_suffix(); + auto index_path = InvertedIndexDescriptor::get_temporary_index_path( + (_index_file_dir / _index_file_name).native(), index_id, index_suffix); + + bool exists = false; + auto st = _fs->exists(index_path.c_str(), &exists); + if (!st.ok()) { + LOG(ERROR) << "index_path:" << index_path << " exists error:" << st; + return ResultError(st); + } + if (exists) { + LOG(ERROR) << "try to init a directory:" << index_path << " already exists"; + return ResultError(Status::InternalError("init_fulltext_index directory already exists")); + } + auto* dir = DorisCompoundDirectoryFactory::getDirectory(_fs, index_path.c_str()); + _indices_dirs.emplace(std::make_pair(index_id, index_suffix), + std::unique_ptr<DorisCompoundDirectory>(dir)); + return dir; +} + +Status InvertedIndexFileWriter::delete_index(const TabletIndex* index_meta) { + if (!index_meta) { + return Status::Error<ErrorCode::INVALID_ARGUMENT>("Index metadata is null."); + } + + auto index_id = index_meta->index_id(); + auto index_suffix = index_meta->get_index_suffix(); + + // Check if the specified index exists + auto index_it = _indices_dirs.find(std::make_pair(index_id, index_suffix)); + if (index_it == _indices_dirs.end()) { + std::ostringstream errMsg; + errMsg << "No inverted index with id " << index_id << " and suffix " << index_suffix + << " found."; + LOG(WARNING) << errMsg.str(); + return Status::OK(); + } + + _indices_dirs.erase(index_it); Review Comment: Does index dir need to be deleted? ########## be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp: ########## @@ -0,0 +1,422 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/inverted_index_file_writer.h" + +#include "common/status.h" +#include "io/fs/file_writer.h" +#include "olap/rowset/segment_v2/inverted_index_compound_directory.h" +#include "olap/rowset/segment_v2/inverted_index_desc.h" +#include "olap/tablet_schema.h" + +namespace doris::segment_v2 { + +std::string InvertedIndexFileWriter::get_index_file_path(const TabletIndex* index_meta) const { + return InvertedIndexDescriptor::get_index_file_name(_index_file_dir / _index_file_name, + index_meta->index_id(), + index_meta->get_index_suffix()); +} + +Status InvertedIndexFileWriter::initialize(InvertedIndexDirectoryMap& indices_dirs) { + _indices_dirs = std::move(indices_dirs); + return Status::OK(); +} + +Result<DorisCompoundDirectory*> InvertedIndexFileWriter::open(const TabletIndex* index_meta) { + auto index_id = index_meta->index_id(); + auto index_suffix = index_meta->get_index_suffix(); + auto index_path = InvertedIndexDescriptor::get_temporary_index_path( + (_index_file_dir / _index_file_name).native(), index_id, index_suffix); + + bool exists = false; + auto st = _fs->exists(index_path.c_str(), &exists); + if (!st.ok()) { + LOG(ERROR) << "index_path:" << index_path << " exists error:" << st; + return ResultError(st); + } + if (exists) { + LOG(ERROR) << "try to init a directory:" << index_path << " already exists"; + return ResultError(Status::InternalError("init_fulltext_index directory already exists")); + } + auto* dir = DorisCompoundDirectoryFactory::getDirectory(_fs, index_path.c_str()); + _indices_dirs.emplace(std::make_pair(index_id, index_suffix), + std::unique_ptr<DorisCompoundDirectory>(dir)); + return dir; +} + +Status InvertedIndexFileWriter::delete_index(const TabletIndex* index_meta) { + if (!index_meta) { + return Status::Error<ErrorCode::INVALID_ARGUMENT>("Index metadata is null."); + } + + auto index_id = index_meta->index_id(); + auto index_suffix = index_meta->get_index_suffix(); + + // Check if the specified index exists + auto index_it = _indices_dirs.find(std::make_pair(index_id, index_suffix)); + if (index_it == _indices_dirs.end()) { + std::ostringstream errMsg; + errMsg << "No inverted index with id " << index_id << " and suffix " << index_suffix + << " found."; + LOG(WARNING) << errMsg.str(); + return Status::OK(); + } + + _indices_dirs.erase(index_it); + return Status::OK(); +} + +size_t InvertedIndexFileWriter::headerLength() { + size_t header_size = 0; + header_size += + sizeof(int) * 2; // Account for the size of the version number and number of indices + for (const auto& entry : _indices_dirs) { + auto suffix = entry.first.second; + header_size += sizeof(int); // index id + header_size += 4; // index suffix name size + header_size += suffix.length(); // index suffix name + header_size += sizeof(int); // index file count + const auto& dir = entry.second; + std::vector<std::string> files; + dir->list(&files); + + for (auto file : files) { + header_size += 4; // file name size + header_size += file.length(); // file name + header_size += 8; // file offset + header_size += 8; // file size + } + } + return header_size; +} + +Status InvertedIndexFileWriter::close() { + if (_indices_dirs.empty()) { + return Status::OK(); + } + try { + if (_storage_format == InvertedIndexStorageFormatPB::V1) { + for (const auto& entry : _indices_dirs) { + auto index_id = entry.first.first; + auto index_suffix = entry.first.second; + const auto& dir = entry.second; + auto* cfsWriter = _CLNEW DorisCompoundFileWriter(dir.get()); + // write compound file + _file_size += cfsWriter->writeCompoundFile(); + // delete index path, which contains separated inverted index files + if (std::string(dir->getObjectName()) == "DorisCompoundDirectory") { + auto temp_dir = InvertedIndexDescriptor::get_temporary_index_path( + _index_file_dir / _index_file_name, index_id, index_suffix); + auto* compound_dir = static_cast<DorisCompoundDirectory*>(dir.get()); + if (compound_dir->getDirName() == temp_dir) { + compound_dir->deleteDirectory(); Review Comment: check return value ########## be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp: ########## @@ -0,0 +1,266 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/inverted_index_file_reader.h" + +#include <memory> +#include <utility> + +#include "olap/rowset/segment_v2/inverted_index_compound_directory.h" +#include "olap/rowset/segment_v2/inverted_index_compound_reader.h" +#include "olap/tablet_schema.h" + +namespace doris::segment_v2 { + +Status InvertedIndexFileReader::init(int32_t read_buffer_size, bool open_idx_file_cache) { + _read_buffer_size = read_buffer_size; + _open_idx_file_cache = open_idx_file_cache; + if (_storage_format == InvertedIndexStorageFormatPB::V2) { + return _init_from_v2(read_buffer_size); + } else { + return Status::OK(); + } +} + +Status InvertedIndexFileReader::_init_from_v2(int32_t read_buffer_size) { + auto index_file_full_path = _index_file_dir / _index_file_name; + try { + bool exists = false; + RETURN_IF_ERROR(_fs->exists(index_file_full_path, &exists)); + if (!exists) { + return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( + "inverted index file {} is not found", index_file_full_path.native()); + } + int64_t file_size = 0; + RETURN_IF_ERROR(_fs->file_size(index_file_full_path, &file_size)); + if (file_size == 0) { + LOG(WARNING) << "inverted index file " << index_file_full_path << " is empty."; + return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( + "inverted index file {} is empty", index_file_full_path.native()); + } + + CLuceneError err; + CL_NS(store)::IndexInput* index_input = nullptr; + auto ok = DorisCompoundDirectory::FSIndexInput::open(_fs, index_file_full_path.c_str(), + index_input, err, read_buffer_size); + if (!ok) { + return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( + "CLuceneError occur when open idx file {}, error msg: {}", + index_file_full_path.native(), err.what()); + } + index_input->setIdxFileCache(_open_idx_file_cache); + _stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input); + int32_t version = _stream->readInt(); // Read version number + if (version == InvertedIndexStorageFormatPB::V2) { + DCHECK(version == _storage_format); + int32_t numIndices = _stream->readInt(); // Read number of indices + ReaderFileEntry* entry = nullptr; + + for (int32_t i = 0; i < numIndices; ++i) { + int64_t indexId = _stream->readInt(); // Read index ID + int32_t suffix_length = _stream->readInt(); // Read suffix length + std::vector<uint8_t> suffix_data(suffix_length); + _stream->readBytes(suffix_data.data(), suffix_length); + std::string suffix_str(suffix_data.begin(), suffix_data.end()); + + int32_t numFiles = _stream->readInt(); // Read number of files in the index + + // true, true means it will deconstruct key and value + auto fileEntries = std::make_unique<EntriesType>(true, true); + + for (int32_t j = 0; j < numFiles; ++j) { + entry = _CLNEW ReaderFileEntry(); + + int32_t file_name_length = _stream->readInt(); + // aid will destruct in EntriesType map. + char* aid = (char*)malloc(file_name_length + 1); + _stream->readBytes(reinterpret_cast<uint8_t*>(aid), file_name_length); + aid[file_name_length] = '\0'; + //stream->readString(tid, CL_MAX_PATH); + entry->file_name = std::string(aid, file_name_length); + entry->offset = _stream->readLong(); + entry->length = _stream->readLong(); + + fileEntries->put(aid, entry); + } + + _indices_entries.emplace(std::make_pair(indexId, std::move(suffix_str)), + std::move(fileEntries)); + } + } else { + return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( + "unknown inverted index format {}", version); + } + } catch (CLuceneError& err) { + if (_stream != nullptr) { + try { + _stream->close(); + } catch (CLuceneError& err) { + return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( + "CLuceneError occur when close idx file {}, error msg: {}", + index_file_full_path.native(), err.what()); + } + } + return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( + "CLuceneError occur when init idx file {}, error msg: {}", + index_file_full_path.native(), err.what()); + } + return Status::OK(); +} + +Result<InvertedIndexDirectoryMap> InvertedIndexFileReader::get_all_directories() { + InvertedIndexDirectoryMap res; + for (auto& index : _indices_entries) { + auto index_id = index.first.first; + auto index_suffix = index.first.second; + LOG(INFO) << "index_id:" << index_id << " index_suffix:" << index_suffix; + auto ret = _open(index_id, index_suffix); + if (!ret.has_value()) { + return ResultError(ret.error()); + } + res.emplace(std::make_pair(index_id, index_suffix), std::move(ret.value())); + } + return res; +} + +Result<std::unique_ptr<DorisCompoundReader>> InvertedIndexFileReader::_open( + int64_t index_id, std::string& index_suffix) const { + std::unique_ptr<DorisCompoundReader> compound_reader; + + if (_storage_format == InvertedIndexStorageFormatPB::V1) { + DorisCompoundDirectory* dir = nullptr; + auto file_name = InvertedIndexDescriptor::get_index_file_name(_segment_file_name, index_id, + index_suffix); + try { + dir = DorisCompoundDirectoryFactory::getDirectory(_fs, _index_file_dir.c_str()); + + compound_reader = std::make_unique<DorisCompoundReader>( + dir, file_name.c_str(), _read_buffer_size, _open_idx_file_cache); + } catch (CLuceneError& err) { + if (dir != nullptr) { + dir->close(); + _CLDELETE(dir) + } + return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( + "CLuceneError occur when open idx file {}, error msg: {}", + (_index_file_dir / file_name).native(), err.what())); + } + } else { + if (_stream == nullptr) { + return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( + "CLuceneError occur when open idx file {}, stream is nullptr", + (_index_file_dir / _index_file_name).native())); + } + + // Check if the specified index exists + auto index_it = _indices_entries.find(std::make_pair(index_id, index_suffix)); + if (index_it == _indices_entries.end()) { + std::ostringstream errMsg; + errMsg << "No index with id " << index_id << " found"; + return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( + "CLuceneError occur when open idx file {}, error msg: {}", + (_index_file_dir / _index_file_name).native(), errMsg.str())); + } + // Need to clone resource here, because index searcher cache need it. + bool own_index_input = true; + compound_reader = std::make_unique<DorisCompoundReader>( + _stream->clone(), index_it->second.get(), own_index_input, _read_buffer_size); + } + return compound_reader; +} +Result<std::unique_ptr<DorisCompoundReader>> InvertedIndexFileReader::open( + const TabletIndex* index_meta) const { + auto index_id = index_meta->index_id(); + auto index_suffix = index_meta->get_index_suffix(); + return _open(index_id, index_suffix); +} + +std::string InvertedIndexFileReader::get_index_file_path(const TabletIndex* index_meta) const { + return InvertedIndexDescriptor::get_index_file_name(_index_file_dir / _segment_file_name, + index_meta->index_id(), + index_meta->get_index_suffix()); +} + +Status InvertedIndexFileReader::index_file_exist(const TabletIndex* index_meta, bool* res) const { + if (_storage_format == InvertedIndexStorageFormatPB::V1) { + auto index_file_path = _index_file_dir / InvertedIndexDescriptor::get_index_file_name( + _segment_file_name, index_meta->index_id(), + index_meta->get_index_suffix()); + return _fs->exists(index_file_path, res); + } else { + if (_stream == nullptr) { + *res = false; + return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( + "idx file {} is not opened", (_index_file_dir / _index_file_name).native()); + } + // Check if the specified index exists + auto index_it = _indices_entries.find( + std::make_pair(index_meta->index_id(), index_meta->get_index_suffix())); + if (index_it == _indices_entries.end()) { + *res = false; + } else { + *res = true; + } + } + return Status::OK(); +} + +Status InvertedIndexFileReader::has_null(const TabletIndex* index_meta, bool* res) const { + if (_storage_format == InvertedIndexStorageFormatPB::V1) { + *res = true; + return Status::OK(); + } + if (_stream == nullptr) { + return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( + "idx file {} is not opened", (_index_file_dir / _index_file_name).native()); + } + // Check if the specified index exists + auto index_it = _indices_entries.find( + std::make_pair(index_meta->index_id(), index_meta->get_index_suffix())); + if (index_it == _indices_entries.end()) { + *res = false; + } else { + auto null_bitmap_file_name = InvertedIndexDescriptor::get_temporary_null_bitmap_file_name(); + auto* entries = index_it->second.get(); + ReaderFileEntry* e = entries->get((char*)(null_bitmap_file_name.c_str())); + if (e == nullptr) { + *res = false; + return Status::OK(); + } + // roaring bitmap cookie header size is 5 + if (e->length <= 5) { + *res = false; + } else { + *res = true; Review Comment: Is it enough to judge has_null as true? ########## be/src/olap/task/index_builder.h: ########## @@ -59,6 +60,7 @@ class IndexBuilder { private: Status _write_inverted_index_data(TabletSchemaSPtr tablet_schema, int32_t segment_idx, vectorized::Block* block); + Status handle_inverted_index_update(); Review Comment: private function name: _handle_inverted_index_update() ########## be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp: ########## @@ -80,19 +59,19 @@ Status compact_column(int32_t index_id, int src_segment_num, int dest_segment_nu for (auto* d : src_index_dirs) { Review Comment: If src_index_dirs and dst_index_dirs are not owned here, close() can also be moved to owner. ########## be/src/olap/task/index_builder.cpp: ########## @@ -69,12 +69,28 @@ Status IndexBuilder::update_inverted_index_info() { output_rs_tablet_schema->copy_from(*input_rs_tablet_schema); if (_is_drop_op) { size_t total_index_size = 0; + auto* beta_rowset = reinterpret_cast<BetaRowset*>(input_rowset.get()); + size_t index_size = 0; + RETURN_IF_ERROR(beta_rowset->get_inverted_index_size(&index_size)); + total_index_size += index_size; for (const auto& t_inverted_index : _alter_inverted_indexes) { - auto* beta_rowset = reinterpret_cast<BetaRowset*>(input_rowset.get()); - size_t index_size = 0; - RETURN_IF_ERROR(beta_rowset->get_inverted_index_size_by_index_id( - t_inverted_index.index_id, &index_size)); - total_index_size += index_size; + DCHECK_EQ(t_inverted_index.columns.size(), 1); + auto column_name = t_inverted_index.columns[0]; + auto column_idx = output_rs_tablet_schema->field_index(column_name); + if (column_idx < 0) { + LOG(WARNING) << "referenced column was missing. " + << "[column=" << column_name << " referenced_column=" << column_idx + << "]"; + continue; Review Comment: mark for error handle ########## be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp: ########## @@ -0,0 +1,422 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/inverted_index_file_writer.h" + +#include "common/status.h" +#include "io/fs/file_writer.h" +#include "olap/rowset/segment_v2/inverted_index_compound_directory.h" +#include "olap/rowset/segment_v2/inverted_index_desc.h" +#include "olap/tablet_schema.h" + +namespace doris::segment_v2 { + +std::string InvertedIndexFileWriter::get_index_file_path(const TabletIndex* index_meta) const { + return InvertedIndexDescriptor::get_index_file_name(_index_file_dir / _index_file_name, + index_meta->index_id(), + index_meta->get_index_suffix()); +} + +Status InvertedIndexFileWriter::initialize(InvertedIndexDirectoryMap& indices_dirs) { + _indices_dirs = std::move(indices_dirs); + return Status::OK(); +} + +Result<DorisCompoundDirectory*> InvertedIndexFileWriter::open(const TabletIndex* index_meta) { + auto index_id = index_meta->index_id(); + auto index_suffix = index_meta->get_index_suffix(); + auto index_path = InvertedIndexDescriptor::get_temporary_index_path( + (_index_file_dir / _index_file_name).native(), index_id, index_suffix); + + bool exists = false; + auto st = _fs->exists(index_path.c_str(), &exists); + if (!st.ok()) { + LOG(ERROR) << "index_path:" << index_path << " exists error:" << st; + return ResultError(st); + } + if (exists) { + LOG(ERROR) << "try to init a directory:" << index_path << " already exists"; + return ResultError(Status::InternalError("init_fulltext_index directory already exists")); + } + auto* dir = DorisCompoundDirectoryFactory::getDirectory(_fs, index_path.c_str()); + _indices_dirs.emplace(std::make_pair(index_id, index_suffix), + std::unique_ptr<DorisCompoundDirectory>(dir)); + return dir; +} + +Status InvertedIndexFileWriter::delete_index(const TabletIndex* index_meta) { + if (!index_meta) { + return Status::Error<ErrorCode::INVALID_ARGUMENT>("Index metadata is null."); + } + + auto index_id = index_meta->index_id(); + auto index_suffix = index_meta->get_index_suffix(); + + // Check if the specified index exists + auto index_it = _indices_dirs.find(std::make_pair(index_id, index_suffix)); + if (index_it == _indices_dirs.end()) { + std::ostringstream errMsg; + errMsg << "No inverted index with id " << index_id << " and suffix " << index_suffix + << " found."; + LOG(WARNING) << errMsg.str(); + return Status::OK(); + } + + _indices_dirs.erase(index_it); + return Status::OK(); +} + +size_t InvertedIndexFileWriter::headerLength() { + size_t header_size = 0; + header_size += + sizeof(int) * 2; // Account for the size of the version number and number of indices + for (const auto& entry : _indices_dirs) { + auto suffix = entry.first.second; + header_size += sizeof(int); // index id + header_size += 4; // index suffix name size + header_size += suffix.length(); // index suffix name + header_size += sizeof(int); // index file count + const auto& dir = entry.second; + std::vector<std::string> files; + dir->list(&files); + + for (auto file : files) { + header_size += 4; // file name size + header_size += file.length(); // file name + header_size += 8; // file offset + header_size += 8; // file size + } + } + return header_size; +} + +Status InvertedIndexFileWriter::close() { + if (_indices_dirs.empty()) { + return Status::OK(); + } + try { + if (_storage_format == InvertedIndexStorageFormatPB::V1) { + for (const auto& entry : _indices_dirs) { Review Comment: Maybe we can write compound idx file after each column finish for V1 format. ########## be/src/olap/rowset/segment_v2/column_reader.cpp: ########## @@ -550,32 +552,34 @@ Status ColumnReader::_load_inverted_index_index(const TabletIndex* index_meta) { if (is_string_type(type)) { if (parser_type != InvertedIndexParserType::PARSER_NONE) { try { - _inverted_index = FullTextIndexReader::create_shared( - _file_reader->fs(), _file_reader->path().native(), index_meta); + _inverted_index = FullTextIndexReader::create_shared(index_meta, index_file_reader); } catch (const CLuceneError& e) { return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( "create FullTextIndexReader error: {}", e.what()); } } else { try { - _inverted_index = StringTypeInvertedIndexReader::create_shared( - _file_reader->fs(), _file_reader->path().native(), index_meta); + _inverted_index = + StringTypeInvertedIndexReader::create_shared(index_meta, index_file_reader); } catch (const CLuceneError& e) { return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( "create StringTypeInvertedIndexReader error: {}", e.what()); } } } else if (is_numeric_type(type)) { try { - _inverted_index = BkdIndexReader::create_shared( - _file_reader->fs(), _file_reader->path().native(), index_meta); + _inverted_index = BkdIndexReader::create_shared(index_meta, index_file_reader); } catch (const CLuceneError& e) { return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( "create BkdIndexReader error: {}", e.what()); } } else { _inverted_index.reset(); } + // TODO: move has null to inverted_index_reader's query function Review Comment: incase missing TODO ########## be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h: ########## @@ -188,19 +170,21 @@ class DorisCompoundDirectory::FSIndexInput : public lucene::store::BufferedIndex io::FileReaderSPtr _reader; uint64_t _length; int64_t _fpos; - std::mutex* _shared_lock = nullptr; + std::mutex _shared_lock; Review Comment: why change this lock? ########## be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp: ########## @@ -0,0 +1,422 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/inverted_index_file_writer.h" + +#include "common/status.h" +#include "io/fs/file_writer.h" +#include "olap/rowset/segment_v2/inverted_index_compound_directory.h" +#include "olap/rowset/segment_v2/inverted_index_desc.h" +#include "olap/tablet_schema.h" + +namespace doris::segment_v2 { + +std::string InvertedIndexFileWriter::get_index_file_path(const TabletIndex* index_meta) const { + return InvertedIndexDescriptor::get_index_file_name(_index_file_dir / _index_file_name, + index_meta->index_id(), + index_meta->get_index_suffix()); +} + +Status InvertedIndexFileWriter::initialize(InvertedIndexDirectoryMap& indices_dirs) { + _indices_dirs = std::move(indices_dirs); + return Status::OK(); +} + +Result<DorisCompoundDirectory*> InvertedIndexFileWriter::open(const TabletIndex* index_meta) { + auto index_id = index_meta->index_id(); + auto index_suffix = index_meta->get_index_suffix(); + auto index_path = InvertedIndexDescriptor::get_temporary_index_path( + (_index_file_dir / _index_file_name).native(), index_id, index_suffix); + + bool exists = false; + auto st = _fs->exists(index_path.c_str(), &exists); + if (!st.ok()) { + LOG(ERROR) << "index_path:" << index_path << " exists error:" << st; + return ResultError(st); + } + if (exists) { + LOG(ERROR) << "try to init a directory:" << index_path << " already exists"; + return ResultError(Status::InternalError("init_fulltext_index directory already exists")); + } + auto* dir = DorisCompoundDirectoryFactory::getDirectory(_fs, index_path.c_str()); + _indices_dirs.emplace(std::make_pair(index_id, index_suffix), + std::unique_ptr<DorisCompoundDirectory>(dir)); + return dir; +} + +Status InvertedIndexFileWriter::delete_index(const TabletIndex* index_meta) { + if (!index_meta) { + return Status::Error<ErrorCode::INVALID_ARGUMENT>("Index metadata is null."); + } + + auto index_id = index_meta->index_id(); + auto index_suffix = index_meta->get_index_suffix(); + + // Check if the specified index exists + auto index_it = _indices_dirs.find(std::make_pair(index_id, index_suffix)); + if (index_it == _indices_dirs.end()) { + std::ostringstream errMsg; + errMsg << "No inverted index with id " << index_id << " and suffix " << index_suffix + << " found."; + LOG(WARNING) << errMsg.str(); + return Status::OK(); + } + + _indices_dirs.erase(index_it); + return Status::OK(); +} + +size_t InvertedIndexFileWriter::headerLength() { + size_t header_size = 0; + header_size += + sizeof(int) * 2; // Account for the size of the version number and number of indices + for (const auto& entry : _indices_dirs) { + auto suffix = entry.first.second; + header_size += sizeof(int); // index id Review Comment: use consistent 4/8 or sizeof(int/size_t) ########## be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp: ########## @@ -0,0 +1,422 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/inverted_index_file_writer.h" + +#include "common/status.h" +#include "io/fs/file_writer.h" +#include "olap/rowset/segment_v2/inverted_index_compound_directory.h" +#include "olap/rowset/segment_v2/inverted_index_desc.h" +#include "olap/tablet_schema.h" + +namespace doris::segment_v2 { + +std::string InvertedIndexFileWriter::get_index_file_path(const TabletIndex* index_meta) const { + return InvertedIndexDescriptor::get_index_file_name(_index_file_dir / _index_file_name, + index_meta->index_id(), + index_meta->get_index_suffix()); +} + +Status InvertedIndexFileWriter::initialize(InvertedIndexDirectoryMap& indices_dirs) { + _indices_dirs = std::move(indices_dirs); + return Status::OK(); +} + +Result<DorisCompoundDirectory*> InvertedIndexFileWriter::open(const TabletIndex* index_meta) { + auto index_id = index_meta->index_id(); + auto index_suffix = index_meta->get_index_suffix(); + auto index_path = InvertedIndexDescriptor::get_temporary_index_path( + (_index_file_dir / _index_file_name).native(), index_id, index_suffix); + + bool exists = false; + auto st = _fs->exists(index_path.c_str(), &exists); + if (!st.ok()) { + LOG(ERROR) << "index_path:" << index_path << " exists error:" << st; + return ResultError(st); + } + if (exists) { + LOG(ERROR) << "try to init a directory:" << index_path << " already exists"; + return ResultError(Status::InternalError("init_fulltext_index directory already exists")); + } + auto* dir = DorisCompoundDirectoryFactory::getDirectory(_fs, index_path.c_str()); + _indices_dirs.emplace(std::make_pair(index_id, index_suffix), + std::unique_ptr<DorisCompoundDirectory>(dir)); + return dir; +} + +Status InvertedIndexFileWriter::delete_index(const TabletIndex* index_meta) { + if (!index_meta) { + return Status::Error<ErrorCode::INVALID_ARGUMENT>("Index metadata is null."); + } + + auto index_id = index_meta->index_id(); + auto index_suffix = index_meta->get_index_suffix(); + + // Check if the specified index exists + auto index_it = _indices_dirs.find(std::make_pair(index_id, index_suffix)); + if (index_it == _indices_dirs.end()) { + std::ostringstream errMsg; + errMsg << "No inverted index with id " << index_id << " and suffix " << index_suffix + << " found."; + LOG(WARNING) << errMsg.str(); + return Status::OK(); + } + + _indices_dirs.erase(index_it); + return Status::OK(); +} + +size_t InvertedIndexFileWriter::headerLength() { + size_t header_size = 0; + header_size += + sizeof(int) * 2; // Account for the size of the version number and number of indices + for (const auto& entry : _indices_dirs) { + auto suffix = entry.first.second; + header_size += sizeof(int); // index id + header_size += 4; // index suffix name size + header_size += suffix.length(); // index suffix name + header_size += sizeof(int); // index file count + const auto& dir = entry.second; + std::vector<std::string> files; + dir->list(&files); + + for (auto file : files) { + header_size += 4; // file name size + header_size += file.length(); // file name + header_size += 8; // file offset + header_size += 8; // file size + } + } + return header_size; +} + +Status InvertedIndexFileWriter::close() { + if (_indices_dirs.empty()) { + return Status::OK(); + } + try { + if (_storage_format == InvertedIndexStorageFormatPB::V1) { + for (const auto& entry : _indices_dirs) { + auto index_id = entry.first.first; + auto index_suffix = entry.first.second; + const auto& dir = entry.second; + auto* cfsWriter = _CLNEW DorisCompoundFileWriter(dir.get()); + // write compound file + _file_size += cfsWriter->writeCompoundFile(); + // delete index path, which contains separated inverted index files + if (std::string(dir->getObjectName()) == "DorisCompoundDirectory") { Review Comment: change magic string literal to a const ########## be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp: ########## @@ -0,0 +1,422 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/inverted_index_file_writer.h" + +#include "common/status.h" +#include "io/fs/file_writer.h" +#include "olap/rowset/segment_v2/inverted_index_compound_directory.h" +#include "olap/rowset/segment_v2/inverted_index_desc.h" +#include "olap/tablet_schema.h" + +namespace doris::segment_v2 { + +std::string InvertedIndexFileWriter::get_index_file_path(const TabletIndex* index_meta) const { + return InvertedIndexDescriptor::get_index_file_name(_index_file_dir / _index_file_name, + index_meta->index_id(), + index_meta->get_index_suffix()); +} + +Status InvertedIndexFileWriter::initialize(InvertedIndexDirectoryMap& indices_dirs) { + _indices_dirs = std::move(indices_dirs); + return Status::OK(); +} + +Result<DorisCompoundDirectory*> InvertedIndexFileWriter::open(const TabletIndex* index_meta) { + auto index_id = index_meta->index_id(); + auto index_suffix = index_meta->get_index_suffix(); + auto index_path = InvertedIndexDescriptor::get_temporary_index_path( + (_index_file_dir / _index_file_name).native(), index_id, index_suffix); + + bool exists = false; + auto st = _fs->exists(index_path.c_str(), &exists); + if (!st.ok()) { + LOG(ERROR) << "index_path:" << index_path << " exists error:" << st; + return ResultError(st); + } + if (exists) { + LOG(ERROR) << "try to init a directory:" << index_path << " already exists"; + return ResultError(Status::InternalError("init_fulltext_index directory already exists")); + } + auto* dir = DorisCompoundDirectoryFactory::getDirectory(_fs, index_path.c_str()); + _indices_dirs.emplace(std::make_pair(index_id, index_suffix), + std::unique_ptr<DorisCompoundDirectory>(dir)); + return dir; +} + +Status InvertedIndexFileWriter::delete_index(const TabletIndex* index_meta) { + if (!index_meta) { + return Status::Error<ErrorCode::INVALID_ARGUMENT>("Index metadata is null."); + } + + auto index_id = index_meta->index_id(); + auto index_suffix = index_meta->get_index_suffix(); + + // Check if the specified index exists + auto index_it = _indices_dirs.find(std::make_pair(index_id, index_suffix)); + if (index_it == _indices_dirs.end()) { + std::ostringstream errMsg; + errMsg << "No inverted index with id " << index_id << " and suffix " << index_suffix + << " found."; + LOG(WARNING) << errMsg.str(); + return Status::OK(); + } + + _indices_dirs.erase(index_it); + return Status::OK(); +} + +size_t InvertedIndexFileWriter::headerLength() { + size_t header_size = 0; + header_size += + sizeof(int) * 2; // Account for the size of the version number and number of indices + for (const auto& entry : _indices_dirs) { + auto suffix = entry.first.second; + header_size += sizeof(int); // index id + header_size += 4; // index suffix name size + header_size += suffix.length(); // index suffix name + header_size += sizeof(int); // index file count + const auto& dir = entry.second; + std::vector<std::string> files; + dir->list(&files); + + for (auto file : files) { + header_size += 4; // file name size + header_size += file.length(); // file name + header_size += 8; // file offset + header_size += 8; // file size + } + } + return header_size; +} + +Status InvertedIndexFileWriter::close() { + if (_indices_dirs.empty()) { + return Status::OK(); + } + try { + if (_storage_format == InvertedIndexStorageFormatPB::V1) { + for (const auto& entry : _indices_dirs) { Review Comment: It may cause poorer IO performance than before due to os page cache. ########## be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp: ########## @@ -0,0 +1,422 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/inverted_index_file_writer.h" + +#include "common/status.h" +#include "io/fs/file_writer.h" +#include "olap/rowset/segment_v2/inverted_index_compound_directory.h" +#include "olap/rowset/segment_v2/inverted_index_desc.h" +#include "olap/tablet_schema.h" + +namespace doris::segment_v2 { + +std::string InvertedIndexFileWriter::get_index_file_path(const TabletIndex* index_meta) const { + return InvertedIndexDescriptor::get_index_file_name(_index_file_dir / _index_file_name, + index_meta->index_id(), + index_meta->get_index_suffix()); +} + +Status InvertedIndexFileWriter::initialize(InvertedIndexDirectoryMap& indices_dirs) { + _indices_dirs = std::move(indices_dirs); + return Status::OK(); +} + +Result<DorisCompoundDirectory*> InvertedIndexFileWriter::open(const TabletIndex* index_meta) { + auto index_id = index_meta->index_id(); + auto index_suffix = index_meta->get_index_suffix(); + auto index_path = InvertedIndexDescriptor::get_temporary_index_path( + (_index_file_dir / _index_file_name).native(), index_id, index_suffix); + + bool exists = false; + auto st = _fs->exists(index_path.c_str(), &exists); + if (!st.ok()) { + LOG(ERROR) << "index_path:" << index_path << " exists error:" << st; + return ResultError(st); + } + if (exists) { + LOG(ERROR) << "try to init a directory:" << index_path << " already exists"; + return ResultError(Status::InternalError("init_fulltext_index directory already exists")); + } + auto* dir = DorisCompoundDirectoryFactory::getDirectory(_fs, index_path.c_str()); + _indices_dirs.emplace(std::make_pair(index_id, index_suffix), + std::unique_ptr<DorisCompoundDirectory>(dir)); + return dir; +} + +Status InvertedIndexFileWriter::delete_index(const TabletIndex* index_meta) { + if (!index_meta) { + return Status::Error<ErrorCode::INVALID_ARGUMENT>("Index metadata is null."); + } + + auto index_id = index_meta->index_id(); + auto index_suffix = index_meta->get_index_suffix(); + + // Check if the specified index exists + auto index_it = _indices_dirs.find(std::make_pair(index_id, index_suffix)); + if (index_it == _indices_dirs.end()) { + std::ostringstream errMsg; + errMsg << "No inverted index with id " << index_id << " and suffix " << index_suffix + << " found."; + LOG(WARNING) << errMsg.str(); + return Status::OK(); + } + + _indices_dirs.erase(index_it); + return Status::OK(); +} + +size_t InvertedIndexFileWriter::headerLength() { + size_t header_size = 0; + header_size += + sizeof(int) * 2; // Account for the size of the version number and number of indices + for (const auto& entry : _indices_dirs) { + auto suffix = entry.first.second; + header_size += sizeof(int); // index id + header_size += 4; // index suffix name size + header_size += suffix.length(); // index suffix name + header_size += sizeof(int); // index file count + const auto& dir = entry.second; + std::vector<std::string> files; + dir->list(&files); + + for (auto file : files) { + header_size += 4; // file name size + header_size += file.length(); // file name + header_size += 8; // file offset + header_size += 8; // file size + } + } + return header_size; +} + +Status InvertedIndexFileWriter::close() { + if (_indices_dirs.empty()) { + return Status::OK(); + } + try { + if (_storage_format == InvertedIndexStorageFormatPB::V1) { + for (const auto& entry : _indices_dirs) { + auto index_id = entry.first.first; + auto index_suffix = entry.first.second; + const auto& dir = entry.second; + auto* cfsWriter = _CLNEW DorisCompoundFileWriter(dir.get()); + // write compound file + _file_size += cfsWriter->writeCompoundFile(); + // delete index path, which contains separated inverted index files + if (std::string(dir->getObjectName()) == "DorisCompoundDirectory") { + auto temp_dir = InvertedIndexDescriptor::get_temporary_index_path( + _index_file_dir / _index_file_name, index_id, index_suffix); + auto* compound_dir = static_cast<DorisCompoundDirectory*>(dir.get()); + if (compound_dir->getDirName() == temp_dir) { + compound_dir->deleteDirectory(); + } else { + LOG(ERROR) << "try to delete wrong inverted index temporal directory, " + "wrong path is " + << compound_dir->getDirName() << " actual needed: " << temp_dir; + } + } + _CLDELETE(cfsWriter) + } + } else { + _file_size = write(); + for (const auto& entry : _indices_dirs) { + auto index_id = entry.first.first; + auto index_suffix = entry.first.second; + const auto& dir = entry.second; + // delete index path, which contains separated inverted index files + if (std::strcmp(dir->getObjectName(), "DorisCompoundDirectory") == 0) { + auto temp_dir = InvertedIndexDescriptor::get_temporary_index_path( + _index_file_dir / _index_file_name, index_id, index_suffix); + auto* compound_dir = static_cast<DorisCompoundDirectory*>(dir.get()); + if (compound_dir->getDirName() == temp_dir) { + compound_dir->deleteDirectory(); + } else { + LOG(ERROR) << "try to delete wrong inverted index temporal directory, " + "wrong path is " + << compound_dir->getDirName() << " actual needed: " << temp_dir; + } + } + } + } + } catch (CLuceneError& err) { + return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( + "CLuceneError occur when close idx file {}, error msg: {}", + InvertedIndexDescriptor::get_index_file_name(_index_file_dir / _index_file_name), + err.what()); + } + return Status::OK(); +} Review Comment: add a new line ########## be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp: ########## @@ -0,0 +1,422 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/inverted_index_file_writer.h" + +#include "common/status.h" +#include "io/fs/file_writer.h" +#include "olap/rowset/segment_v2/inverted_index_compound_directory.h" +#include "olap/rowset/segment_v2/inverted_index_desc.h" +#include "olap/tablet_schema.h" + +namespace doris::segment_v2 { + +std::string InvertedIndexFileWriter::get_index_file_path(const TabletIndex* index_meta) const { + return InvertedIndexDescriptor::get_index_file_name(_index_file_dir / _index_file_name, + index_meta->index_id(), + index_meta->get_index_suffix()); +} + +Status InvertedIndexFileWriter::initialize(InvertedIndexDirectoryMap& indices_dirs) { + _indices_dirs = std::move(indices_dirs); + return Status::OK(); +} + +Result<DorisCompoundDirectory*> InvertedIndexFileWriter::open(const TabletIndex* index_meta) { + auto index_id = index_meta->index_id(); + auto index_suffix = index_meta->get_index_suffix(); + auto index_path = InvertedIndexDescriptor::get_temporary_index_path( + (_index_file_dir / _index_file_name).native(), index_id, index_suffix); + + bool exists = false; + auto st = _fs->exists(index_path.c_str(), &exists); + if (!st.ok()) { + LOG(ERROR) << "index_path:" << index_path << " exists error:" << st; + return ResultError(st); + } + if (exists) { + LOG(ERROR) << "try to init a directory:" << index_path << " already exists"; + return ResultError(Status::InternalError("init_fulltext_index directory already exists")); + } + auto* dir = DorisCompoundDirectoryFactory::getDirectory(_fs, index_path.c_str()); + _indices_dirs.emplace(std::make_pair(index_id, index_suffix), + std::unique_ptr<DorisCompoundDirectory>(dir)); + return dir; +} + +Status InvertedIndexFileWriter::delete_index(const TabletIndex* index_meta) { + if (!index_meta) { + return Status::Error<ErrorCode::INVALID_ARGUMENT>("Index metadata is null."); + } + + auto index_id = index_meta->index_id(); + auto index_suffix = index_meta->get_index_suffix(); + + // Check if the specified index exists + auto index_it = _indices_dirs.find(std::make_pair(index_id, index_suffix)); + if (index_it == _indices_dirs.end()) { + std::ostringstream errMsg; + errMsg << "No inverted index with id " << index_id << " and suffix " << index_suffix + << " found."; + LOG(WARNING) << errMsg.str(); + return Status::OK(); + } + + _indices_dirs.erase(index_it); + return Status::OK(); +} + +size_t InvertedIndexFileWriter::headerLength() { + size_t header_size = 0; + header_size += + sizeof(int) * 2; // Account for the size of the version number and number of indices + for (const auto& entry : _indices_dirs) { + auto suffix = entry.first.second; + header_size += sizeof(int); // index id + header_size += 4; // index suffix name size + header_size += suffix.length(); // index suffix name + header_size += sizeof(int); // index file count + const auto& dir = entry.second; + std::vector<std::string> files; + dir->list(&files); + + for (auto file : files) { + header_size += 4; // file name size + header_size += file.length(); // file name + header_size += 8; // file offset + header_size += 8; // file size + } + } + return header_size; +} + +Status InvertedIndexFileWriter::close() { + if (_indices_dirs.empty()) { + return Status::OK(); + } + try { + if (_storage_format == InvertedIndexStorageFormatPB::V1) { + for (const auto& entry : _indices_dirs) { + auto index_id = entry.first.first; + auto index_suffix = entry.first.second; + const auto& dir = entry.second; + auto* cfsWriter = _CLNEW DorisCompoundFileWriter(dir.get()); + // write compound file + _file_size += cfsWriter->writeCompoundFile(); + // delete index path, which contains separated inverted index files + if (std::string(dir->getObjectName()) == "DorisCompoundDirectory") { + auto temp_dir = InvertedIndexDescriptor::get_temporary_index_path( + _index_file_dir / _index_file_name, index_id, index_suffix); + auto* compound_dir = static_cast<DorisCompoundDirectory*>(dir.get()); + if (compound_dir->getDirName() == temp_dir) { + compound_dir->deleteDirectory(); + } else { + LOG(ERROR) << "try to delete wrong inverted index temporal directory, " + "wrong path is " + << compound_dir->getDirName() << " actual needed: " << temp_dir; + } + } + _CLDELETE(cfsWriter) + } + } else { + _file_size = write(); + for (const auto& entry : _indices_dirs) { + auto index_id = entry.first.first; + auto index_suffix = entry.first.second; + const auto& dir = entry.second; + // delete index path, which contains separated inverted index files + if (std::strcmp(dir->getObjectName(), "DorisCompoundDirectory") == 0) { + auto temp_dir = InvertedIndexDescriptor::get_temporary_index_path( + _index_file_dir / _index_file_name, index_id, index_suffix); + auto* compound_dir = static_cast<DorisCompoundDirectory*>(dir.get()); + if (compound_dir->getDirName() == temp_dir) { + compound_dir->deleteDirectory(); + } else { + LOG(ERROR) << "try to delete wrong inverted index temporal directory, " + "wrong path is " + << compound_dir->getDirName() << " actual needed: " << temp_dir; + } + } + } + } + } catch (CLuceneError& err) { + return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( + "CLuceneError occur when close idx file {}, error msg: {}", + InvertedIndexDescriptor::get_index_file_name(_index_file_dir / _index_file_name), + err.what()); + } + return Status::OK(); +} +size_t InvertedIndexFileWriter::write() { + // Create the output stream to write the compound file + int64_t current_offset = headerLength(); + std::string idx_name = InvertedIndexDescriptor::get_index_file_name(_index_file_name); + auto* out_dir = DorisCompoundDirectoryFactory::getDirectory(_fs, _index_file_dir.c_str()); + + auto compound_file_output = + std::unique_ptr<lucene::store::IndexOutput>(out_dir->createOutput(idx_name.c_str())); + + // Write the version number + compound_file_output->writeInt(InvertedIndexStorageFormatPB::V2); + + // Write the number of indices + const auto numIndices = static_cast<uint32_t>(_indices_dirs.size()); + compound_file_output->writeInt(numIndices); + + std::vector<std::tuple<std::string, int64_t, int64_t, CL_NS(store)::Directory*>> + file_metadata; // Store file name, offset, file length, and corresponding directory + + // First, write all index information and file metadata + for (const auto& entry : _indices_dirs) { + const int64_t index_id = entry.first.first; + const auto& index_suffix = entry.first.second; + const auto& dir = entry.second; + std::vector<std::string> files; + dir->list(&files); + + auto it = std::find(files.begin(), files.end(), DorisCompoundDirectory::WRITE_LOCK_FILE); + if (it != files.end()) { + files.erase(it); + } + // sort file list by file length + std::vector<std::pair<std::string, int64_t>> sorted_files; + for (auto file : files) { + sorted_files.emplace_back(file, dir->fileLength(file.c_str())); + } + // TODO: need to optimize + std::sort(sorted_files.begin(), sorted_files.end(), + [](const std::pair<std::string, int64_t>& a, + const std::pair<std::string, int64_t>& b) { return (a.second < b.second); }); + + int32_t file_count = sorted_files.size(); + + // Write the index ID and the number of files + compound_file_output->writeInt(index_id); + const auto* index_suffix_str = reinterpret_cast<const uint8_t*>(index_suffix.c_str()); + compound_file_output->writeInt(index_suffix.length()); + compound_file_output->writeBytes(index_suffix_str, index_suffix.length()); + compound_file_output->writeInt(file_count); + + // Calculate the offset for each file and write the file metadata + for (const auto& file : sorted_files) { + int64_t file_length = dir->fileLength(file.first.c_str()); + const auto* file_name = reinterpret_cast<const uint8_t*>(file.first.c_str()); + compound_file_output->writeInt(file.first.length()); + compound_file_output->writeBytes(file_name, file.first.length()); + compound_file_output->writeLong(current_offset); + compound_file_output->writeLong(file_length); + + file_metadata.emplace_back(file.first, current_offset, file_length, dir.get()); + current_offset += file_length; // Update the data offset + } + } + + const int64_t buffer_length = 16384; + uint8_t header_buffer[buffer_length]; + + // Next, write the file data Review Comment: So, small file written in header is not preserved. ########## be/src/olap/rowset/segment_v2/inverted_index_reader.cpp: ########## @@ -223,30 +232,19 @@ Status InvertedIndexReader::handle_searcher_cache( } } -Status InvertedIndexReader::create_index_searcher(IndexSearcherPtr* searcher, io::FileSystemSPtr fs, - const io::Path& index_dir, - const std::string& index_file_name, +Status InvertedIndexReader::create_index_searcher(lucene::store::Directory* dir, + IndexSearcherPtr* searcher, MemTracker* mem_tracker, InvertedIndexReaderType reader_type) { - auto index_file_path = index_dir / index_file_name; - bool exists = false; - RETURN_IF_ERROR(fs->exists(index_file_path, &exists)); - if (!exists) { - LOG(WARNING) << "inverted index: " << index_file_path << " not exist."; - return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>( - "inverted index input file {} not found", index_file_path.native()); - } SCOPED_CONSUME_MEM_TRACKER(mem_tracker); - bool open_idx_file_cache = true; - auto* directory = new DorisCompoundReader( - DorisCompoundDirectoryFactory::getDirectory(fs, index_dir.c_str()), - index_file_name.c_str(), config::inverted_index_read_buffer_size, open_idx_file_cache); - auto index_searcher_builder = DORIS_TRY(IndexSearcherBuilder::create_index_searcher_builder(reader_type)); - auto searcher_result = DORIS_TRY(index_searcher_builder->get_index_searcher(directory)); + auto searcher_result = DORIS_TRY(index_searcher_builder->get_index_searcher(dir)); *searcher = searcher_result; + if (std::string(dir->getObjectName()) == "DorisCompoundReader") { + static_cast<DorisCompoundReader*>(dir)->getDorisIndexInput()->setIdxFileCache(false); Review Comment: why not cache? ########## be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp: ########## @@ -641,16 +473,7 @@ bool DorisCompoundDirectory::openInput(const char* name, lucene::store::IndexInp return FSIndexInput::open(fs, fl, ret, error, bufferSize); } -void DorisCompoundDirectory::close() { - if (useCompoundFileWriter) { - auto* cfsWriter = _CLNEW DorisCompoundFileWriter(this); - // write compound file - compound_file_size = cfsWriter->writeCompoundFile(); - // delete index path, which contains separated inverted index files - deleteDirectory(); - _CLDELETE(cfsWriter) - } -} +void DorisCompoundDirectory::close() {} Review Comment: Does output stream need to be closed here? ########## be/src/olap/rowset/segment_v2/inverted_index_reader.h: ########## @@ -179,18 +174,16 @@ class FullTextIndexReader : public InvertedIndexReader { const std::vector<std::string>& analyse_result, const FulltextIndexSearcherPtr& index_searcher, const std::shared_ptr<roaring::Roaring>& term_match_bitmap); - - void check_null_bitmap(const FulltextIndexSearcherPtr& index_searcher, Review Comment: mark ########## be/src/olap/rowset/segment_v2/inverted_index_compaction.h: ########## @@ -16,19 +16,23 @@ // under the License. #pragma once +#include <CLucene.h> + #include <cstdint> #include <string> #include <vector> #include "io/fs/file_system.h" namespace doris { - +class TabletIndex; namespace segment_v2 { -Status compact_column(int32_t index_id, int src_segment_num, int dest_segment_num, - std::vector<std::string> src_index_files, - std::vector<std::string> dest_index_files, const io::FileSystemSPtr& fs, - std::string index_writer_path, std::string tablet_path, +class InvertedIndexFileWriter; +class InvertedIndexFileReader; + +Status compact_column(int64_t index_id, std::vector<lucene::store::Directory*>& src_index_dirs, Review Comment: Is int32_t not enough? ########## be/src/olap/task/index_builder.cpp: ########## @@ -69,12 +69,28 @@ Status IndexBuilder::update_inverted_index_info() { output_rs_tablet_schema->copy_from(*input_rs_tablet_schema); if (_is_drop_op) { size_t total_index_size = 0; + auto* beta_rowset = reinterpret_cast<BetaRowset*>(input_rowset.get()); + size_t index_size = 0; + RETURN_IF_ERROR(beta_rowset->get_inverted_index_size(&index_size)); + total_index_size += index_size; for (const auto& t_inverted_index : _alter_inverted_indexes) { - auto* beta_rowset = reinterpret_cast<BetaRowset*>(input_rowset.get()); - size_t index_size = 0; - RETURN_IF_ERROR(beta_rowset->get_inverted_index_size_by_index_id( - t_inverted_index.index_id, &index_size)); - total_index_size += index_size; + DCHECK_EQ(t_inverted_index.columns.size(), 1); + auto column_name = t_inverted_index.columns[0]; + auto column_idx = output_rs_tablet_schema->field_index(column_name); + if (column_idx < 0) { + LOG(WARNING) << "referenced column was missing. " + << "[column=" << column_name << " referenced_column=" << column_idx + << "]"; + continue; + } + auto column = output_rs_tablet_schema->column(column_idx); + const auto* index_meta = output_rs_tablet_schema->get_inverted_index(column); + if (index_meta == nullptr) { + LOG(ERROR) << "failed to find column: " << column_name + << " index_id: " << t_inverted_index.index_id; + continue; Review Comment: mark for error handle ########## be/src/olap/task/index_builder.h: ########## @@ -59,6 +60,7 @@ class IndexBuilder { private: Status _write_inverted_index_data(TabletSchemaSPtr tablet_schema, int32_t segment_idx, vectorized::Block* block); + Status handle_inverted_index_update(); Review Comment: it's not used any where. ########## be/src/olap/rowset/segment_v2/inverted_index_file_writer.h: ########## @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <CLucene.h> // IWYU pragma: keep +#include <CLucene/store/IndexInput.h> +#include <gen_cpp/olap_file.pb.h> + +#include <string> +#include <vector> + +#include "io/fs/file_system.h" +#include "olap/rowset/segment_v2/inverted_index_desc.h" + +namespace doris { +class TabletIndex; + +namespace segment_v2 { +class DorisCompoundDirectory; +using InvertedIndexDirectoryMap = + std::map<std::pair<int64_t, std::string>, std::unique_ptr<lucene::store::Directory>>; + +class DorisCompoundFileWriter : LUCENE_BASE { +public: + DorisCompoundFileWriter() = default; + DorisCompoundFileWriter(CL_NS(store)::Directory* dir); + ~DorisCompoundFileWriter() override = default; + /** Returns the directory of the compound file. */ + CL_NS(store)::Directory* getDirectory(); + virtual size_t writeCompoundFile(); + static void copyFile(const char* fileName, lucene::store::Directory* dir, + lucene::store::IndexOutput* output, uint8_t* buffer, int64_t bufferLength); + +private: + class FileInfo { + public: + std::string filename; + int32_t filesize; + }; + + void sort_files(std::vector<FileInfo>& file_infos); + + CL_NS(store)::Directory* directory = nullptr; +}; + +class InvertedIndexFileWriter { +public: + InvertedIndexFileWriter(const io::FileSystemSPtr& fs, io::Path segment_file_dir, + std::string segment_file_name, + InvertedIndexStorageFormatPB storage_format) + : _fs(fs), + _index_file_dir(std::move(segment_file_dir)), + _index_file_name(std::move(segment_file_name)), Review Comment: suggest _segment_file_name, the same as Reader -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org