This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new a4a191fe56c [fix](index compaction)Fix MOW index compaction core 
(#32121) (#32657)
a4a191fe56c is described below

commit a4a191fe56c9d2b5f0d46315f8079de82a4a65ac
Author: qiye <jianliang5...@gmail.com>
AuthorDate: Fri Mar 22 14:20:19 2024 +0800

    [fix](index compaction)Fix MOW index compaction core (#32121) (#32657)
---
 be/src/clucene                    |   2 +-
 be/src/common/config.cpp          |   2 +
 be/src/common/config.h            |   2 +
 be/src/index-tools/index_tool.cpp | 141 +++++++++++++++++++++++++++++++++++++-
 be/src/olap/compaction.cpp        |  42 ++++++++++++
 be/src/olap/tablet_meta.cpp       |   3 +-
 6 files changed, 186 insertions(+), 6 deletions(-)

diff --git a/be/src/clucene b/be/src/clucene
index c5ba0a26e9c..ef95e67ae31 160000
--- a/be/src/clucene
+++ b/be/src/clucene
@@ -1 +1 @@
-Subproject commit c5ba0a26e9cab11a85dc3c5854e9ad258fa4fdf5
+Subproject commit ef95e67ae3123409f006072194f742a079603159
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 6bb6d52d834..ab7bd39e034 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1024,6 +1024,8 @@ DEFINE_Int32(inverted_index_read_buffer_size, "4096");
 DEFINE_Int32(max_depth_in_bkd_tree, "32");
 // index compaction
 DEFINE_mBool(inverted_index_compaction_enable, "false");
+// Only for debug, do not use in production
+DEFINE_mBool(debug_inverted_index_compaction, "false");
 // index by RAM directory
 DEFINE_mBool(inverted_index_ram_dir_enable, "false");
 // use num_broadcast_buffer blocks as buffer to do broadcast
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 85db11b9e61..4aaed552d89 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1061,6 +1061,8 @@ DECLARE_Int32(inverted_index_read_buffer_size);
 DECLARE_Int32(max_depth_in_bkd_tree);
 // index compaction
 DECLARE_mBool(inverted_index_compaction_enable);
+// Only for debug, do not use in production
+DECLARE_mBool(debug_inverted_index_compaction);
 // index by RAM directory
 DECLARE_mBool(inverted_index_ram_dir_enable);
 // use num_broadcast_buffer blocks as buffer to do broadcast
diff --git a/be/src/index-tools/index_tool.cpp 
b/be/src/index-tools/index_tool.cpp
index 3f541caf680..62593c11785 100644
--- a/be/src/index-tools/index_tool.cpp
+++ b/be/src/index-tools/index_tool.cpp
@@ -25,6 +25,7 @@
 #include <fstream>
 #include <iostream>
 #include <memory>
+#include <nlohmann/json.hpp>
 #include <roaring/roaring.hh>
 #include <sstream>
 #include <string>
@@ -62,7 +63,8 @@ using namespace lucene::index;
 using namespace lucene::util;
 using namespace lucene::search;
 
-DEFINE_string(operation, "", "valid operation: 
show_nested_files,check_terms,term_query");
+DEFINE_string(operation, "",
+              "valid operation: 
show_nested_files,check_terms,term_query,debug_index_compaction");
 
 DEFINE_string(directory, "./", "inverted index file directory");
 DEFINE_string(idx_file_name, "", "inverted index file name");
@@ -72,19 +74,33 @@ DEFINE_string(term, "", "inverted index term to query");
 DEFINE_string(column_name, "", "inverted index column_name to query");
 DEFINE_string(pred_type, "", "inverted index term query predicate, 
eq/lt/gt/le/ge/match etc.");
 DEFINE_bool(print_row_id, false, "print row id when query terms");
+DEFINE_bool(print_doc_id, false, "print doc id when check terms stats");
+// only for debug index compaction
+DEFINE_int32(idx_id, -1, "inverted index id");
+DEFINE_string(src_idx_dirs_file, "", "source segment index files");
+DEFINE_string(dest_idx_dirs_file, "", "destination segment index files");
+DEFINE_string(dest_seg_num_rows_file, "", "destination segment number of 
rows");
+DEFINE_string(tablet_path, "", "tablet path");
+DEFINE_string(trans_vec_file, "", "rowid conversion map file");
 
 std::string get_usage(const std::string& progname) {
     std::stringstream ss;
     ss << progname << " is the Doris inverted index file tool.\n";
-    ss << "Stop BE first before use this tool.\n";
     ss << "Usage:\n";
     ss << "./index_tool --operation=show_nested_files 
--idx_file_path=path/to/file\n";
-    ss << "./index_tool --operation=check_terms_stats 
--idx_file_path=path/to/file\n";
+    ss << "./index_tool --operation=check_terms_stats 
--idx_file_path=path/to/file "
+          "--print_doc_id\n";
     ss << "./index_tool --operation=term_query --directory=directory "
           "--idx_file_name=file --print_row_id --term=term 
--column_name=column_name "
           "--pred_type=eq/lt/gt/le/ge/match etc\n";
     ss << "./index_tool --operation=write_index_v2 
--idx_file_path=path/to/index "
           "--data_file_path=data/to/index\n";
+    ss << "*** debug_index_compaction operation is only for offline debug 
index compaction, do not "
+          "use in production ***\n";
+    ss << "./index_tool --operation=debug_index_compaction --idx_id=index_id "
+          "--src_idx_dirs_file=path/to/file --dest_idx_dirs_file=path/to/file "
+          "--dest_seg_num_rows_file=path/to/file --tablet_path=path/to/tablet "
+          "--trans_vec_file=path/to/file\n";
     return ss.str();
 }
 
@@ -192,6 +208,14 @@ void check_terms_stats(lucene::store::Directory* dir) {
 
         printf("Term: %s ", token.c_str());
         printf("Freq: %d\n", te->docFreq());
+        if (FLAGS_print_doc_id) {
+            TermDocs* td = r->termDocs(te->term());
+            while (td->next()) {
+                printf("DocID: %d ", td->doc());
+                printf("TermFreq: %d\n", td->freq());
+            }
+            _CLLDELETE(td);
+        }
     }
     printf("Term count: %d\n\n", nterms);
     te->close();
@@ -351,6 +375,117 @@ int main(int argc, char** argv) {
             }
             return -1;
         }
+    } else if (FLAGS_operation == "debug_index_compaction") {
+        // only for debug index compaction, do not use in production
+        if (FLAGS_idx_id <= 0 || FLAGS_src_idx_dirs_file == "" || 
FLAGS_dest_idx_dirs_file == "" ||
+            FLAGS_dest_seg_num_rows_file == "" || FLAGS_tablet_path == "" ||
+            FLAGS_trans_vec_file == "") {
+            std::cout << "invalid params for debug_index_compaction " << 
std::endl;
+            return -1;
+        }
+
+        auto fs = doris::io::global_local_filesystem();
+
+        auto read_file_to_json = [&](const std::string& file, std::string& 
output) {
+            doris::io::FileReaderSPtr file_reader;
+            doris::Status status = fs->open_file(file, &file_reader);
+            if (!status.ok()) {
+                std::cout << "read file " << file << " failed" << std::endl;
+                return false;
+            }
+            size_t fsize = file_reader->size();
+            if (fsize > 0) {
+                output.resize(fsize);
+                size_t bytes_read = 0;
+                status = file_reader->read_at(0, {output.data(), fsize}, 
&bytes_read);
+            }
+            if (!status.ok()) {
+                std::cout << "read file " << file << " failed" << std::endl;
+                return false;
+            }
+            return true;
+        };
+
+        int32_t index_id = FLAGS_idx_id;
+        std::string tablet_path = FLAGS_tablet_path;
+        std::string src_index_dirs_string;
+        std::string dest_index_dirs_string;
+        std::string dest_segment_num_rows_string;
+        std::string trans_vec_string;
+
+        if (!read_file_to_json(FLAGS_src_idx_dirs_file, src_index_dirs_string) 
||
+            !read_file_to_json(FLAGS_dest_idx_dirs_file, 
dest_index_dirs_string) ||
+            !read_file_to_json(FLAGS_dest_seg_num_rows_file, 
dest_segment_num_rows_string) ||
+            !read_file_to_json(FLAGS_trans_vec_file, trans_vec_string)) {
+            return -1;
+        }
+        std::vector<std::string> src_index_files = 
nlohmann::json::parse(src_index_dirs_string);
+        std::vector<std::string> dest_index_files = 
nlohmann::json::parse(dest_index_dirs_string);
+        std::vector<uint32_t> dest_segment_num_rows =
+                nlohmann::json::parse(dest_segment_num_rows_string);
+        std::vector<std::vector<std::pair<uint32_t, uint32_t>>> trans_vec =
+                nlohmann::json::parse(trans_vec_string);
+        int src_segment_num = src_index_files.size();
+        int dest_segment_num = dest_index_files.size();
+
+        std::string index_writer_path = tablet_path + "/tmp_index_writer";
+        lucene::store::Directory* dir =
+                DorisCompoundDirectoryFactory::getDirectory(fs, 
index_writer_path.c_str(), false);
+        lucene::analysis::SimpleAnalyzer<char> analyzer;
+        auto index_writer = _CLNEW lucene::index::IndexWriter(dir, &analyzer, 
true /* create */,
+                                                              true /* 
closeDirOnShutdown */);
+        std::ostream* infoStream = &std::cout;
+        index_writer->setInfoStream(infoStream);
+        // get compound directory src_index_dirs
+        std::vector<lucene::store::Directory*> src_index_dirs(src_segment_num);
+        for (int i = 0; i < src_segment_num; ++i) {
+            // format: rowsetId_segmentId_indexId.idx
+            std::string src_idx_full_name =
+                    src_index_files[i] + "_" + std::to_string(index_id) + 
".idx";
+            DorisCompoundReader* reader = new DorisCompoundReader(
+                    DorisCompoundDirectoryFactory::getDirectory(fs, 
tablet_path.c_str()),
+                    src_idx_full_name.c_str());
+            src_index_dirs[i] = reader;
+        }
+
+        // get dest idx file paths
+        std::vector<lucene::store::Directory*> 
dest_index_dirs(dest_segment_num);
+        for (int i = 0; i < dest_segment_num; ++i) {
+            // format: rowsetId_segmentId_columnId
+            auto path = tablet_path + "/" + dest_index_files[i] + "_" + 
std::to_string(index_id);
+            dest_index_dirs[i] =
+                    DorisCompoundDirectoryFactory::getDirectory(fs, 
path.c_str(), true);
+        }
+
+        index_writer->indexCompaction(src_index_dirs, dest_index_dirs, 
trans_vec,
+                                      dest_segment_num_rows);
+
+        index_writer->close();
+        _CLDELETE(index_writer);
+        // NOTE: need to ref_cnt-- for dir,
+        // when index_writer is destroyed, if closeDir is set, dir will be 
close
+        // _CLDECDELETE(dir) will try to ref_cnt--, when it decreases to 1, 
dir will be destroyed.
+        _CLDECDELETE(dir)
+        for (auto d : src_index_dirs) {
+            if (d != nullptr) {
+                d->close();
+                _CLDELETE(d);
+            }
+        }
+        for (auto d : dest_index_dirs) {
+            if (d != nullptr) {
+                // NOTE: DO NOT close dest dir here, because it will be closed 
when dest index writer finalize.
+                //d->close();
+                _CLDELETE(d);
+            }
+        }
+
+        // delete temporary index_writer_path
+        if (!fs->delete_directory(index_writer_path.c_str()).ok()) {
+            std::cout << "delete temporary index writer path: " << 
index_writer_path << " failed."
+                      << std::endl;
+            return -1;
+        }
     } else if (FLAGS_operation == "write_index_v2") {
         if (FLAGS_idx_file_path == "") {
             std::cout << "no index path flag for check " << std::endl;
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index c4dd3cf31c5..e852344688c 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -27,6 +27,7 @@
 #include <map>
 #include <memory>
 #include <mutex>
+#include <nlohmann/json.hpp>
 #include <numeric>
 #include <ostream>
 #include <set>
@@ -36,6 +37,7 @@
 #include "common/config.h"
 #include "common/status.h"
 #include "io/fs/file_system.h"
+#include "io/fs/file_writer.h"
 #include "io/fs/remote_file_system.h"
 #include "olap/cumulative_compaction_policy.h"
 #include "olap/cumulative_compaction_time_series_policy.h"
@@ -471,6 +473,46 @@ Status Compaction::do_compaction_impl(int64_t permits) {
                 dest_index_files[i] = prefix;
             }
 
+            // Only write info files when debug index compaction is enabled.
+            // The files are used to debug index compaction and works with 
index_tool.
+            if (config::debug_inverted_index_compaction) {
+                auto write_json_to_file = [&](const nlohmann::json& json_obj,
+                                              const std::string& file_name) {
+                    io::FileWriterPtr file_writer;
+                    std::string file_path =
+                            fmt::format("{}/{}.json", config::sys_log_dir, 
file_name);
+                    RETURN_IF_ERROR(
+                            
io::global_local_filesystem()->create_file(file_path, &file_writer));
+                    RETURN_IF_ERROR(file_writer->append(json_obj.dump()));
+                    RETURN_IF_ERROR(file_writer->append("\n"));
+                    return file_writer->close();
+                };
+
+                // Convert trans_vec to JSON and print it
+                nlohmann::json trans_vec_json = trans_vec;
+                auto output_version = _output_version.to_string().substr(
+                        1, _output_version.to_string().size() - 2);
+                RETURN_IF_ERROR(write_json_to_file(
+                        trans_vec_json,
+                        fmt::format("trans_vec_{}_{}", _tablet->tablet_id(), 
output_version)));
+
+                nlohmann::json src_index_files_json = src_index_files;
+                RETURN_IF_ERROR(write_json_to_file(
+                        src_index_files_json,
+                        fmt::format("src_idx_dirs_{}_{}", 
_tablet->tablet_id(), output_version)));
+
+                nlohmann::json dest_index_files_json = dest_index_files;
+                RETURN_IF_ERROR(write_json_to_file(
+                        dest_index_files_json,
+                        fmt::format("dest_idx_dirs_{}_{}", 
_tablet->tablet_id(), output_version)));
+
+                nlohmann::json dest_segment_num_rows_json = 
dest_segment_num_rows;
+                RETURN_IF_ERROR(
+                        write_json_to_file(dest_segment_num_rows_json,
+                                           
fmt::format("dest_seg_num_rows_{}_{}",
+                                                       _tablet->tablet_id(), 
output_version)));
+            }
+
             // create index_writer to compaction indexes
             const auto& fs = _output_rowset->rowset_meta()->fs();
             const auto& tablet_path = _tablet->tablet_path();
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 23f4428f747..cc9b9dc0b9c 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -71,8 +71,7 @@ TabletMetaSharedPtr TabletMeta::create(
             request.time_series_compaction_file_count_threshold,
             request.time_series_compaction_time_threshold_seconds,
             request.time_series_compaction_empty_rowsets_threshold,
-            request.inverted_index_storage_format,
-            request.time_series_compaction_level_threshold);
+            request.inverted_index_storage_format, 
request.time_series_compaction_level_threshold);
 }
 
 TabletMeta::TabletMeta()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to