This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new a4a191fe56c [fix](index compaction)Fix MOW index compaction core (#32121) (#32657) a4a191fe56c is described below commit a4a191fe56c9d2b5f0d46315f8079de82a4a65ac Author: qiye <jianliang5...@gmail.com> AuthorDate: Fri Mar 22 14:20:19 2024 +0800 [fix](index compaction)Fix MOW index compaction core (#32121) (#32657) --- be/src/clucene | 2 +- be/src/common/config.cpp | 2 + be/src/common/config.h | 2 + be/src/index-tools/index_tool.cpp | 141 +++++++++++++++++++++++++++++++++++++- be/src/olap/compaction.cpp | 42 ++++++++++++ be/src/olap/tablet_meta.cpp | 3 +- 6 files changed, 186 insertions(+), 6 deletions(-) diff --git a/be/src/clucene b/be/src/clucene index c5ba0a26e9c..ef95e67ae31 160000 --- a/be/src/clucene +++ b/be/src/clucene @@ -1 +1 @@ -Subproject commit c5ba0a26e9cab11a85dc3c5854e9ad258fa4fdf5 +Subproject commit ef95e67ae3123409f006072194f742a079603159 diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 6bb6d52d834..ab7bd39e034 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1024,6 +1024,8 @@ DEFINE_Int32(inverted_index_read_buffer_size, "4096"); DEFINE_Int32(max_depth_in_bkd_tree, "32"); // index compaction DEFINE_mBool(inverted_index_compaction_enable, "false"); +// Only for debug, do not use in production +DEFINE_mBool(debug_inverted_index_compaction, "false"); // index by RAM directory DEFINE_mBool(inverted_index_ram_dir_enable, "false"); // use num_broadcast_buffer blocks as buffer to do broadcast diff --git a/be/src/common/config.h b/be/src/common/config.h index 85db11b9e61..4aaed552d89 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1061,6 +1061,8 @@ DECLARE_Int32(inverted_index_read_buffer_size); DECLARE_Int32(max_depth_in_bkd_tree); // index compaction DECLARE_mBool(inverted_index_compaction_enable); +// Only for debug, do not use in production +DECLARE_mBool(debug_inverted_index_compaction); // index by RAM directory DECLARE_mBool(inverted_index_ram_dir_enable); // use num_broadcast_buffer blocks as buffer to do broadcast diff --git a/be/src/index-tools/index_tool.cpp b/be/src/index-tools/index_tool.cpp index 3f541caf680..62593c11785 100644 --- a/be/src/index-tools/index_tool.cpp +++ b/be/src/index-tools/index_tool.cpp @@ -25,6 +25,7 @@ #include <fstream> #include <iostream> #include <memory> +#include <nlohmann/json.hpp> #include <roaring/roaring.hh> #include <sstream> #include <string> @@ -62,7 +63,8 @@ using namespace lucene::index; using namespace lucene::util; using namespace lucene::search; -DEFINE_string(operation, "", "valid operation: show_nested_files,check_terms,term_query"); +DEFINE_string(operation, "", + "valid operation: show_nested_files,check_terms,term_query,debug_index_compaction"); DEFINE_string(directory, "./", "inverted index file directory"); DEFINE_string(idx_file_name, "", "inverted index file name"); @@ -72,19 +74,33 @@ DEFINE_string(term, "", "inverted index term to query"); DEFINE_string(column_name, "", "inverted index column_name to query"); DEFINE_string(pred_type, "", "inverted index term query predicate, eq/lt/gt/le/ge/match etc."); DEFINE_bool(print_row_id, false, "print row id when query terms"); +DEFINE_bool(print_doc_id, false, "print doc id when check terms stats"); +// only for debug index compaction +DEFINE_int32(idx_id, -1, "inverted index id"); +DEFINE_string(src_idx_dirs_file, "", "source segment index files"); +DEFINE_string(dest_idx_dirs_file, "", "destination segment index files"); +DEFINE_string(dest_seg_num_rows_file, "", "destination segment number of rows"); +DEFINE_string(tablet_path, "", "tablet path"); +DEFINE_string(trans_vec_file, "", "rowid conversion map file"); std::string get_usage(const std::string& progname) { std::stringstream ss; ss << progname << " is the Doris inverted index file tool.\n"; - ss << "Stop BE first before use this tool.\n"; ss << "Usage:\n"; ss << "./index_tool --operation=show_nested_files --idx_file_path=path/to/file\n"; - ss << "./index_tool --operation=check_terms_stats --idx_file_path=path/to/file\n"; + ss << "./index_tool --operation=check_terms_stats --idx_file_path=path/to/file " + "--print_doc_id\n"; ss << "./index_tool --operation=term_query --directory=directory " "--idx_file_name=file --print_row_id --term=term --column_name=column_name " "--pred_type=eq/lt/gt/le/ge/match etc\n"; ss << "./index_tool --operation=write_index_v2 --idx_file_path=path/to/index " "--data_file_path=data/to/index\n"; + ss << "*** debug_index_compaction operation is only for offline debug index compaction, do not " + "use in production ***\n"; + ss << "./index_tool --operation=debug_index_compaction --idx_id=index_id " + "--src_idx_dirs_file=path/to/file --dest_idx_dirs_file=path/to/file " + "--dest_seg_num_rows_file=path/to/file --tablet_path=path/to/tablet " + "--trans_vec_file=path/to/file\n"; return ss.str(); } @@ -192,6 +208,14 @@ void check_terms_stats(lucene::store::Directory* dir) { printf("Term: %s ", token.c_str()); printf("Freq: %d\n", te->docFreq()); + if (FLAGS_print_doc_id) { + TermDocs* td = r->termDocs(te->term()); + while (td->next()) { + printf("DocID: %d ", td->doc()); + printf("TermFreq: %d\n", td->freq()); + } + _CLLDELETE(td); + } } printf("Term count: %d\n\n", nterms); te->close(); @@ -351,6 +375,117 @@ int main(int argc, char** argv) { } return -1; } + } else if (FLAGS_operation == "debug_index_compaction") { + // only for debug index compaction, do not use in production + if (FLAGS_idx_id <= 0 || FLAGS_src_idx_dirs_file == "" || FLAGS_dest_idx_dirs_file == "" || + FLAGS_dest_seg_num_rows_file == "" || FLAGS_tablet_path == "" || + FLAGS_trans_vec_file == "") { + std::cout << "invalid params for debug_index_compaction " << std::endl; + return -1; + } + + auto fs = doris::io::global_local_filesystem(); + + auto read_file_to_json = [&](const std::string& file, std::string& output) { + doris::io::FileReaderSPtr file_reader; + doris::Status status = fs->open_file(file, &file_reader); + if (!status.ok()) { + std::cout << "read file " << file << " failed" << std::endl; + return false; + } + size_t fsize = file_reader->size(); + if (fsize > 0) { + output.resize(fsize); + size_t bytes_read = 0; + status = file_reader->read_at(0, {output.data(), fsize}, &bytes_read); + } + if (!status.ok()) { + std::cout << "read file " << file << " failed" << std::endl; + return false; + } + return true; + }; + + int32_t index_id = FLAGS_idx_id; + std::string tablet_path = FLAGS_tablet_path; + std::string src_index_dirs_string; + std::string dest_index_dirs_string; + std::string dest_segment_num_rows_string; + std::string trans_vec_string; + + if (!read_file_to_json(FLAGS_src_idx_dirs_file, src_index_dirs_string) || + !read_file_to_json(FLAGS_dest_idx_dirs_file, dest_index_dirs_string) || + !read_file_to_json(FLAGS_dest_seg_num_rows_file, dest_segment_num_rows_string) || + !read_file_to_json(FLAGS_trans_vec_file, trans_vec_string)) { + return -1; + } + std::vector<std::string> src_index_files = nlohmann::json::parse(src_index_dirs_string); + std::vector<std::string> dest_index_files = nlohmann::json::parse(dest_index_dirs_string); + std::vector<uint32_t> dest_segment_num_rows = + nlohmann::json::parse(dest_segment_num_rows_string); + std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec = + nlohmann::json::parse(trans_vec_string); + int src_segment_num = src_index_files.size(); + int dest_segment_num = dest_index_files.size(); + + std::string index_writer_path = tablet_path + "/tmp_index_writer"; + lucene::store::Directory* dir = + DorisCompoundDirectoryFactory::getDirectory(fs, index_writer_path.c_str(), false); + lucene::analysis::SimpleAnalyzer<char> analyzer; + auto index_writer = _CLNEW lucene::index::IndexWriter(dir, &analyzer, true /* create */, + true /* closeDirOnShutdown */); + std::ostream* infoStream = &std::cout; + index_writer->setInfoStream(infoStream); + // get compound directory src_index_dirs + std::vector<lucene::store::Directory*> src_index_dirs(src_segment_num); + for (int i = 0; i < src_segment_num; ++i) { + // format: rowsetId_segmentId_indexId.idx + std::string src_idx_full_name = + src_index_files[i] + "_" + std::to_string(index_id) + ".idx"; + DorisCompoundReader* reader = new DorisCompoundReader( + DorisCompoundDirectoryFactory::getDirectory(fs, tablet_path.c_str()), + src_idx_full_name.c_str()); + src_index_dirs[i] = reader; + } + + // get dest idx file paths + std::vector<lucene::store::Directory*> dest_index_dirs(dest_segment_num); + for (int i = 0; i < dest_segment_num; ++i) { + // format: rowsetId_segmentId_columnId + auto path = tablet_path + "/" + dest_index_files[i] + "_" + std::to_string(index_id); + dest_index_dirs[i] = + DorisCompoundDirectoryFactory::getDirectory(fs, path.c_str(), true); + } + + index_writer->indexCompaction(src_index_dirs, dest_index_dirs, trans_vec, + dest_segment_num_rows); + + index_writer->close(); + _CLDELETE(index_writer); + // NOTE: need to ref_cnt-- for dir, + // when index_writer is destroyed, if closeDir is set, dir will be close + // _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, dir will be destroyed. + _CLDECDELETE(dir) + for (auto d : src_index_dirs) { + if (d != nullptr) { + d->close(); + _CLDELETE(d); + } + } + for (auto d : dest_index_dirs) { + if (d != nullptr) { + // NOTE: DO NOT close dest dir here, because it will be closed when dest index writer finalize. + //d->close(); + _CLDELETE(d); + } + } + + // delete temporary index_writer_path + if (!fs->delete_directory(index_writer_path.c_str()).ok()) { + std::cout << "delete temporary index writer path: " << index_writer_path << " failed." + << std::endl; + return -1; + } } else if (FLAGS_operation == "write_index_v2") { if (FLAGS_idx_file_path == "") { std::cout << "no index path flag for check " << std::endl; diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index c4dd3cf31c5..e852344688c 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -27,6 +27,7 @@ #include <map> #include <memory> #include <mutex> +#include <nlohmann/json.hpp> #include <numeric> #include <ostream> #include <set> @@ -36,6 +37,7 @@ #include "common/config.h" #include "common/status.h" #include "io/fs/file_system.h" +#include "io/fs/file_writer.h" #include "io/fs/remote_file_system.h" #include "olap/cumulative_compaction_policy.h" #include "olap/cumulative_compaction_time_series_policy.h" @@ -471,6 +473,46 @@ Status Compaction::do_compaction_impl(int64_t permits) { dest_index_files[i] = prefix; } + // Only write info files when debug index compaction is enabled. + // The files are used to debug index compaction and works with index_tool. + if (config::debug_inverted_index_compaction) { + auto write_json_to_file = [&](const nlohmann::json& json_obj, + const std::string& file_name) { + io::FileWriterPtr file_writer; + std::string file_path = + fmt::format("{}/{}.json", config::sys_log_dir, file_name); + RETURN_IF_ERROR( + io::global_local_filesystem()->create_file(file_path, &file_writer)); + RETURN_IF_ERROR(file_writer->append(json_obj.dump())); + RETURN_IF_ERROR(file_writer->append("\n")); + return file_writer->close(); + }; + + // Convert trans_vec to JSON and print it + nlohmann::json trans_vec_json = trans_vec; + auto output_version = _output_version.to_string().substr( + 1, _output_version.to_string().size() - 2); + RETURN_IF_ERROR(write_json_to_file( + trans_vec_json, + fmt::format("trans_vec_{}_{}", _tablet->tablet_id(), output_version))); + + nlohmann::json src_index_files_json = src_index_files; + RETURN_IF_ERROR(write_json_to_file( + src_index_files_json, + fmt::format("src_idx_dirs_{}_{}", _tablet->tablet_id(), output_version))); + + nlohmann::json dest_index_files_json = dest_index_files; + RETURN_IF_ERROR(write_json_to_file( + dest_index_files_json, + fmt::format("dest_idx_dirs_{}_{}", _tablet->tablet_id(), output_version))); + + nlohmann::json dest_segment_num_rows_json = dest_segment_num_rows; + RETURN_IF_ERROR( + write_json_to_file(dest_segment_num_rows_json, + fmt::format("dest_seg_num_rows_{}_{}", + _tablet->tablet_id(), output_version))); + } + // create index_writer to compaction indexes const auto& fs = _output_rowset->rowset_meta()->fs(); const auto& tablet_path = _tablet->tablet_path(); diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 23f4428f747..cc9b9dc0b9c 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -71,8 +71,7 @@ TabletMetaSharedPtr TabletMeta::create( request.time_series_compaction_file_count_threshold, request.time_series_compaction_time_threshold_seconds, request.time_series_compaction_empty_rowsets_threshold, - request.inverted_index_storage_format, - request.time_series_compaction_level_threshold); + request.inverted_index_storage_format, request.time_series_compaction_level_threshold); } TabletMeta::TabletMeta() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org