This is an automated email from the ASF dual-hosted git repository. airborne pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new fdbd9caf977 branch-3.0: [opt](inverted index) add performance profiling for remote io access in inverted index #43542 (#44093) fdbd9caf977 is described below commit fdbd9caf9775e166ede720246dae10c9a38c6d66 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Mon Nov 18 11:59:23 2024 +0800 branch-3.0: [opt](inverted index) add performance profiling for remote io access in inverted index #43542 (#44093) Cherry-picked from #43542 Co-authored-by: zzzxl <yangs...@selectdb.com> --- be/src/clucene | 2 +- be/src/index-tools/index_tool.cpp | 2 +- be/src/olap/compaction.cpp | 9 +- be/src/olap/rowset/segment_v2/column_reader.cpp | 2 +- .../inverted_index/query/conjunction_query.cpp | 5 +- .../inverted_index/query/conjunction_query.h | 3 +- .../inverted_index/query/disjunction_query.cpp | 6 +- .../inverted_index/query/disjunction_query.h | 3 +- .../inverted_index/query/phrase_edge_query.cpp | 2 +- .../inverted_index/query/phrase_edge_query.h | 2 +- .../inverted_index/query/phrase_prefix_query.cpp | 3 +- .../inverted_index/query/phrase_prefix_query.h | 2 +- .../inverted_index/query/phrase_query.cpp | 8 +- .../segment_v2/inverted_index/query/phrase_query.h | 3 +- .../rowset/segment_v2/inverted_index/query/query.h | 1 + .../inverted_index/query/regexp_query.cpp | 4 +- .../segment_v2/inverted_index/query/regexp_query.h | 2 +- .../segment_v2/inverted_index_compound_reader.cpp | 15 +++ .../segment_v2/inverted_index_file_reader.cpp | 5 +- .../rowset/segment_v2/inverted_index_file_reader.h | 4 +- .../segment_v2/inverted_index_fs_directory.cpp | 37 ++++++- .../segment_v2/inverted_index_fs_directory.h | 7 +- .../rowset/segment_v2/inverted_index_reader.cpp | 94 +++++++++-------- .../olap/rowset/segment_v2/inverted_index_reader.h | 83 ++++++++------- .../compaction/index_compaction_test.cpp | 4 +- .../index_compaction_with_deleted_term.cpp | 4 +- .../fault_injection_p0/test_index_io_context.out | 73 +++++++++++++ .../test_index_io_context.groovy | 113 +++++++++++++++++++++ 28 files changed, 373 insertions(+), 125 deletions(-) diff --git a/be/src/clucene b/be/src/clucene index 7cf6cf410d4..48fa9cc4ec3 160000 --- a/be/src/clucene +++ b/be/src/clucene @@ -1 +1 @@ -Subproject commit 7cf6cf410d41d95456edba263cc55b7b6f5ab027 +Subproject commit 48fa9cc4ec32b40bf3b02338d0a1b2cdbc6408cf diff --git a/be/src/index-tools/index_tool.cpp b/be/src/index-tools/index_tool.cpp index adea2cd84c9..ca0575dc545 100644 --- a/be/src/index-tools/index_tool.cpp +++ b/be/src/index-tools/index_tool.cpp @@ -170,7 +170,7 @@ void search(lucene::store::Directory* dir, std::string& field, std::string& toke std::vector<std::string> terms = split(token, '|'); doris::TQueryOptions queryOptions; - ConjunctionQuery conjunct_query(s, queryOptions); + ConjunctionQuery conjunct_query(s, queryOptions, nullptr); conjunct_query.add(field_ws, terms); conjunct_query.search(result); diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index abbd84001c8..e608033701e 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -607,11 +607,9 @@ Status Compaction::do_inverted_index_compaction() { fs, std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)}, _cur_tablet_schema->get_inverted_index_storage_format(), rowset->rowset_meta()->inverted_index_file_info(seg_id)); - bool open_idx_file_cache = false; RETURN_NOT_OK_STATUS_WITH_WARN( - inverted_index_file_reader->init(config::inverted_index_read_buffer_size, - open_idx_file_cache), - "inverted_index_file_reader init failed"); + inverted_index_file_reader->init(config::inverted_index_read_buffer_size), + "inverted_index_file_reader init faiqled"); inverted_index_file_readers[m.second] = std::move(inverted_index_file_reader); } @@ -779,9 +777,8 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) { InvertedIndexDescriptor::get_index_file_path_prefix(*seg_path)}, _cur_tablet_schema->get_inverted_index_storage_format(), rowset->rowset_meta()->inverted_index_file_info(i)); - bool open_idx_file_cache = false; auto st = inverted_index_file_reader->init( - config::inverted_index_read_buffer_size, open_idx_file_cache); + config::inverted_index_read_buffer_size); index_file_path = inverted_index_file_reader->get_index_file_path(index_meta); DBUG_EXECUTE_IF( "Compaction::construct_skip_inverted_index_index_file_reader_init_" diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index e2e6e93f602..66e24fc105e 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -336,7 +336,7 @@ Status ColumnReader::new_inverted_index_iterator( { std::shared_lock<std::shared_mutex> rlock(_load_index_lock); if (_inverted_index) { - RETURN_IF_ERROR(_inverted_index->new_iterator(read_options.stats, + RETURN_IF_ERROR(_inverted_index->new_iterator(read_options.io_ctx, read_options.stats, read_options.runtime_state, iterator)); } } diff --git a/be/src/olap/rowset/segment_v2/inverted_index/query/conjunction_query.cpp b/be/src/olap/rowset/segment_v2/inverted_index/query/conjunction_query.cpp index fb247951716..6e9d61db7fd 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index/query/conjunction_query.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index/query/conjunction_query.cpp @@ -20,8 +20,9 @@ namespace doris::segment_v2 { ConjunctionQuery::ConjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher, - const TQueryOptions& query_options) + const TQueryOptions& query_options, const io::IOContext* io_ctx) : _searcher(searcher), + _io_ctx(io_ctx), _index_version(_searcher->getReader()->getIndexVersion()), _conjunction_ratio(query_options.inverted_index_conjunction_opt_threshold) {} @@ -48,7 +49,7 @@ void ConjunctionQuery::add(const std::wstring& field_name, const std::vector<std std::wstring ws_term = StringUtil::string_to_wstring(term); Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str()); _terms.push_back(t); - TermDocs* term_doc = _searcher->getReader()->termDocs(t); + TermDocs* term_doc = _searcher->getReader()->termDocs(t, _io_ctx); _term_docs.push_back(term_doc); iterators.emplace_back(term_doc); } diff --git a/be/src/olap/rowset/segment_v2/inverted_index/query/conjunction_query.h b/be/src/olap/rowset/segment_v2/inverted_index/query/conjunction_query.h index 2571392d529..b9bfee2bfb1 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index/query/conjunction_query.h +++ b/be/src/olap/rowset/segment_v2/inverted_index/query/conjunction_query.h @@ -27,7 +27,7 @@ namespace doris::segment_v2 { class ConjunctionQuery : public Query { public: ConjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher, - const TQueryOptions& query_options); + const TQueryOptions& query_options, const io::IOContext* io_ctx); ~ConjunctionQuery() override; void add(const std::wstring& field_name, const std::vector<std::string>& terms) override; @@ -41,6 +41,7 @@ private: public: std::shared_ptr<lucene::search::IndexSearcher> _searcher; + const io::IOContext* _io_ctx = nullptr; IndexVersion _index_version = IndexVersion::kV0; int32_t _conjunction_ratio = 1000; diff --git a/be/src/olap/rowset/segment_v2/inverted_index/query/disjunction_query.cpp b/be/src/olap/rowset/segment_v2/inverted_index/query/disjunction_query.cpp index 650a88c0646..852357073d3 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index/query/disjunction_query.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index/query/disjunction_query.cpp @@ -20,8 +20,8 @@ namespace doris::segment_v2 { DisjunctionQuery::DisjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher, - const TQueryOptions& query_options) - : _searcher(searcher) {} + const TQueryOptions& query_options, const io::IOContext* io_ctx) + : _searcher(searcher), _io_ctx(io_ctx) {} void DisjunctionQuery::add(const std::wstring& field_name, const std::vector<std::string>& terms) { if (terms.empty()) { @@ -36,7 +36,7 @@ void DisjunctionQuery::search(roaring::Roaring& roaring) { auto func = [this, &roaring](const std::string& term, bool first) { std::wstring ws_term = StringUtil::string_to_wstring(term); auto* t = _CLNEW Term(_field_name.c_str(), ws_term.c_str()); - auto* term_doc = _searcher->getReader()->termDocs(t); + auto* term_doc = _searcher->getReader()->termDocs(t, _io_ctx); TermIterator iterator(term_doc); DocRange doc_range; diff --git a/be/src/olap/rowset/segment_v2/inverted_index/query/disjunction_query.h b/be/src/olap/rowset/segment_v2/inverted_index/query/disjunction_query.h index 35783146157..8d0559ee4b0 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index/query/disjunction_query.h +++ b/be/src/olap/rowset/segment_v2/inverted_index/query/disjunction_query.h @@ -27,7 +27,7 @@ namespace doris::segment_v2 { class DisjunctionQuery : public Query { public: DisjunctionQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher, - const TQueryOptions& query_options); + const TQueryOptions& query_options, const io::IOContext* io_ctx); ~DisjunctionQuery() override = default; void add(const std::wstring& field_name, const std::vector<std::string>& terms) override; @@ -35,6 +35,7 @@ public: private: std::shared_ptr<lucene::search::IndexSearcher> _searcher; + const io::IOContext* _io_ctx = nullptr; std::wstring _field_name; std::vector<std::string> _terms; diff --git a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_edge_query.cpp b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_edge_query.cpp index ec1b5bdd9e4..f82433826e9 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_edge_query.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_edge_query.cpp @@ -30,7 +30,7 @@ namespace doris::segment_v2 { PhraseEdgeQuery::PhraseEdgeQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher, - const TQueryOptions& query_options) + const TQueryOptions& query_options, const io::IOContext* io_ctx) : _searcher(searcher), _query(std::make_unique<CL_NS(search)::MultiPhraseQuery>()), _max_expansions(query_options.inverted_index_max_expansions) {} diff --git a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_edge_query.h b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_edge_query.h index 5daf382e0d0..9eb3bd57c4a 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_edge_query.h +++ b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_edge_query.h @@ -31,7 +31,7 @@ namespace doris::segment_v2 { class PhraseEdgeQuery : public Query { public: PhraseEdgeQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher, - const TQueryOptions& query_options); + const TQueryOptions& query_options, const io::IOContext* io_ctx); ~PhraseEdgeQuery() override = default; void add(const std::wstring& field_name, const std::vector<std::string>& terms) override; diff --git a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_prefix_query.cpp b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_prefix_query.cpp index 407e515dc92..88bb3c1171f 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_prefix_query.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_prefix_query.cpp @@ -23,7 +23,8 @@ namespace doris::segment_v2 { PhrasePrefixQuery::PhrasePrefixQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher, - const TQueryOptions& query_options) + const TQueryOptions& query_options, + const io::IOContext* io_ctx) : _searcher(searcher), _query(std::make_unique<CL_NS(search)::MultiPhraseQuery>()), _max_expansions(query_options.inverted_index_max_expansions) {} diff --git a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_prefix_query.h b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_prefix_query.h index e565c0409cf..5cac597951e 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_prefix_query.h +++ b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_prefix_query.h @@ -31,7 +31,7 @@ namespace doris::segment_v2 { class PhrasePrefixQuery : public Query { public: PhrasePrefixQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher, - const TQueryOptions& query_options); + const TQueryOptions& query_options, const io::IOContext* io_ctx); ~PhrasePrefixQuery() override = default; void add(const std::wstring& field_name, const std::vector<std::string>& terms) override; diff --git a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.cpp b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.cpp index 0ca2dce94e3..13081869712 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.cpp @@ -118,8 +118,8 @@ bool OrderedSloppyPhraseMatcher::stretch_to_order(PostingsAndPosition* prev_post } PhraseQuery::PhraseQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher, - const TQueryOptions& query_options) - : _searcher(searcher) {} + const TQueryOptions& query_options, const io::IOContext* io_ctx) + : _searcher(searcher), _io_ctx(io_ctx) {} PhraseQuery::~PhraseQuery() { for (auto& term_doc : _term_docs) { @@ -166,7 +166,7 @@ void PhraseQuery::add(const std::wstring& field_name, const std::vector<std::str std::wstring ws_term = StringUtil::string_to_wstring(terms[0]); Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str()); _terms.push_back(t); - TermDocs* term_doc = _searcher->getReader()->termDocs(t); + TermDocs* term_doc = _searcher->getReader()->termDocs(t, _io_ctx); _term_docs.push_back(term_doc); _lead1 = TermIterator(term_doc); return; @@ -177,7 +177,7 @@ void PhraseQuery::add(const std::wstring& field_name, const std::vector<std::str std::wstring ws_term = StringUtil::string_to_wstring(term); Term* t = _CLNEW Term(field_name.c_str(), ws_term.c_str()); _terms.push_back(t); - TermPositions* term_pos = _searcher->getReader()->termPositions(t); + TermPositions* term_pos = _searcher->getReader()->termPositions(t, _io_ctx); _term_docs.push_back(term_pos); iterators.emplace_back(term_pos); return term_pos; diff --git a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.h b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.h index 253ba782b78..006d1eddec1 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.h +++ b/be/src/olap/rowset/segment_v2/inverted_index/query/phrase_query.h @@ -85,7 +85,7 @@ using Matcher = std::variant<ExactPhraseMatcher, OrderedSloppyPhraseMatcher, Phr class PhraseQuery : public Query { public: PhraseQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher, - const TQueryOptions& query_options); + const TQueryOptions& query_options, const io::IOContext* io_ctx); ~PhraseQuery() override; void add(const InvertedIndexQueryInfo& query_info) override; @@ -106,6 +106,7 @@ public: private: std::shared_ptr<lucene::search::IndexSearcher> _searcher; + const io::IOContext* _io_ctx = nullptr; TermIterator _lead1; TermIterator _lead2; diff --git a/be/src/olap/rowset/segment_v2/inverted_index/query/query.h b/be/src/olap/rowset/segment_v2/inverted_index/query/query.h index cef7fd51f72..786abf8acd9 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index/query/query.h +++ b/be/src/olap/rowset/segment_v2/inverted_index/query/query.h @@ -27,6 +27,7 @@ #include <memory> #include "common/status.h" +#include "io/io_common.h" #include "roaring/roaring.hh" CL_NS_USE(index) diff --git a/be/src/olap/rowset/segment_v2/inverted_index/query/regexp_query.cpp b/be/src/olap/rowset/segment_v2/inverted_index/query/regexp_query.cpp index 007da8289dc..69de4b7818b 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index/query/regexp_query.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index/query/regexp_query.cpp @@ -25,10 +25,10 @@ namespace doris::segment_v2 { RegexpQuery::RegexpQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher, - const TQueryOptions& query_options) + const TQueryOptions& query_options, const io::IOContext* io_ctx) : _searcher(searcher), _max_expansions(query_options.inverted_index_max_expansions), - _query(searcher, query_options) {} + _query(searcher, query_options, io_ctx) {} void RegexpQuery::add(const std::wstring& field_name, const std::vector<std::string>& patterns) { if (patterns.size() != 1) { diff --git a/be/src/olap/rowset/segment_v2/inverted_index/query/regexp_query.h b/be/src/olap/rowset/segment_v2/inverted_index/query/regexp_query.h index 336b2d0b6a6..650ad2bf10b 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index/query/regexp_query.h +++ b/be/src/olap/rowset/segment_v2/inverted_index/query/regexp_query.h @@ -28,7 +28,7 @@ namespace doris::segment_v2 { class RegexpQuery : public Query { public: RegexpQuery(const std::shared_ptr<lucene::search::IndexSearcher>& searcher, - const TQueryOptions& query_options); + const TQueryOptions& query_options, const io::IOContext* io_ctx); ~RegexpQuery() override = default; void add(const std::wstring& field_name, const std::vector<std::string>& patterns) override; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp index 7613df112ed..60006ea8455 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp @@ -59,6 +59,8 @@ private: CL_NS(store)::IndexInput* base; int64_t fileOffset; int64_t _length; + const io::IOContext* _io_ctx = nullptr; + bool _is_index_file = false; // Indicates if the file is a TII file protected: void readInternal(uint8_t* /*b*/, const int32_t /*len*/) override; @@ -75,6 +77,8 @@ public: const char* getDirectoryType() const override { return DorisCompoundReader::getClassName(); } const char* getObjectName() const override { return getClassName(); } static const char* getClassName() { return "CSIndexInput"; } + void setIoContext(const void* io_ctx) override; + void setIndexFile(bool isIndexFile) override; }; CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset, @@ -92,9 +96,12 @@ void CSIndexInput::readInternal(uint8_t* b, const int32_t len) { if (start + len > _length) { _CLTHROWA(CL_ERR_IO, "read past EOF"); } + base->setIoContext(_io_ctx); + base->setIndexFile(_is_index_file); base->seek(fileOffset + start); bool read_from_buffer = true; base->readBytes(b, len, read_from_buffer); + base->setIoContext(nullptr); } CSIndexInput::~CSIndexInput() = default; @@ -111,6 +118,14 @@ CSIndexInput::CSIndexInput(const CSIndexInput& clone) : BufferedIndexInput(clone void CSIndexInput::close() {} +void CSIndexInput::setIoContext(const void* io_ctx) { + _io_ctx = static_cast<const io::IOContext*>(io_ctx); +} + +void CSIndexInput::setIndexFile(bool isIndexFile) { + _is_index_file = isIndexFile; +} + DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, int32_t read_buffer_size) : _ram_dir(new lucene::store::RAMDirectory()), _stream(stream), diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp index e0c75922c98..113833d560f 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp @@ -27,10 +27,9 @@ namespace doris::segment_v2 { -Status InvertedIndexFileReader::init(int32_t read_buffer_size, bool open_idx_file_cache) { +Status InvertedIndexFileReader::init(int32_t read_buffer_size) { if (!_inited) { _read_buffer_size = read_buffer_size; - _open_idx_file_cache = open_idx_file_cache; if (_storage_format == InvertedIndexStorageFormatPB::V2) { auto st = _init_from_v2(read_buffer_size); if (!st.ok()) { @@ -76,7 +75,6 @@ Status InvertedIndexFileReader::_init_from_v2(int32_t read_buffer_size) { "CLuceneError occur when open idx file {}, error msg: {}", index_file_full_path, err.what()); } - index_input->setIdxFileCache(_open_idx_file_cache); _stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input); // 3. read file @@ -198,7 +196,6 @@ Result<std::unique_ptr<DorisCompoundReader>> InvertedIndexFileReader::_open( } // 3. read file in DorisCompoundReader - index_input->setIdxFileCache(_open_idx_file_cache); compound_reader = std::make_unique<DorisCompoundReader>(index_input, _read_buffer_size); } catch (CLuceneError& err) { return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h index 8bc28b1882f..3b7161c7643 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.h @@ -58,8 +58,7 @@ public: _storage_format(storage_format), _idx_file_info(idx_file_info) {} - Status init(int32_t read_buffer_size = config::inverted_index_read_buffer_size, - bool open_idx_file_cache = false); + Status init(int32_t read_buffer_size = config::inverted_index_read_buffer_size); Result<std::unique_ptr<DorisCompoundReader>> open(const TabletIndex* index_meta) const; void debug_file_entries(); std::string get_index_file_cache_key(const TabletIndex* index_meta) const; @@ -80,7 +79,6 @@ private: const io::FileSystemSPtr _fs; std::string _index_path_prefix; int32_t _read_buffer_size = -1; - bool _open_idx_file_cache = false; InvertedIndexStorageFormatPB _storage_format; mutable std::shared_mutex _mutex; // Use mutable for const read operations bool _inited = false; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp index ded71c8a6cc..29caf29936d 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp @@ -219,6 +219,27 @@ void DorisFSDirectory::FSIndexInput::close() { }*/ } +void DorisFSDirectory::FSIndexInput::setIoContext(const void* io_ctx) { + if (io_ctx) { + const auto& ctx = static_cast<const io::IOContext*>(io_ctx); + _io_ctx.reader_type = ctx->reader_type; + _io_ctx.query_id = ctx->query_id; + _io_ctx.file_cache_stats = ctx->file_cache_stats; + } else { + _io_ctx.reader_type = ReaderType::UNKNOWN; + _io_ctx.query_id = nullptr; + _io_ctx.file_cache_stats = nullptr; + } +} + +const void* DorisFSDirectory::FSIndexInput::getIoContext() { + return &_io_ctx; +} + +void DorisFSDirectory::FSIndexInput::setIndexFile(bool isIndexFile) { + _io_ctx.is_index_data = isIndexFile; +} + void DorisFSDirectory::FSIndexInput::seekInternal(const int64_t position) { CND_PRECONDITION(position >= 0 && position < _handle->_length, "Seeking out of range"); _pos = position; @@ -239,9 +260,23 @@ void DorisFSDirectory::FSIndexInput::readInternal(uint8_t* b, const int32_t len) _handle->_fpos = _pos; } + DBUG_EXECUTE_IF( + "DorisFSDirectory::FSIndexInput::readInternal", ({ + static thread_local std::unordered_map<const TUniqueId*, io::FileCacheStatistics*> + thread_file_cache_map; + auto it = thread_file_cache_map.find(_io_ctx.query_id); + if (it != thread_file_cache_map.end()) { + if (_io_ctx.file_cache_stats != it->second) { + _CLTHROWA(CL_ERR_IO, "File cache statistics mismatch"); + } + } else { + thread_file_cache_map[_io_ctx.query_id] = _io_ctx.file_cache_stats; + } + })); + Slice result {b, (size_t)len}; size_t bytes_read = 0; - auto st = _handle->_reader->read_at(_pos, result, &bytes_read, &_io_ctx); + Status st = _handle->_reader->read_at(_pos, result, &bytes_read, &_io_ctx); DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexInput::readInternal_reader_read_at_error", { st = Status::InternalError( "debug point: DorisFSDirectory::FSIndexInput::readInternal_reader_read_at_error"); diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h index 59ae6db1a96..fd92873c970 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.h @@ -180,8 +180,6 @@ class DorisFSDirectory::FSIndexInput : public lucene::store::BufferedIndexInput : BufferedIndexInput(buffer_size) { this->_pos = 0; this->_handle = std::move(handle); - this->_io_ctx.reader_type = ReaderType::READER_QUERY; - this->_io_ctx.is_index_data = false; } protected: @@ -199,8 +197,9 @@ public: const char* getDirectoryType() const override { return DorisFSDirectory::getClassName(); } const char* getObjectName() const override { return getClassName(); } static const char* getClassName() { return "FSIndexInput"; } - - void setIdxFileCache(bool index) override { _io_ctx.is_index_data = index; } + void setIoContext(const void* io_ctx) override; + const void* getIoContext() override; + void setIndexFile(bool isIndexFile) override; std::mutex _this_lock; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp index 7b8504322d2..b31ba80ee46 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp @@ -102,7 +102,8 @@ std::string InvertedIndexReader::get_index_file_path() { return _inverted_index_file_reader->get_index_file_path(&_index_meta); } -Status InvertedIndexReader::read_null_bitmap(OlapReaderStatistics* stats, +Status InvertedIndexReader::read_null_bitmap(const io::IOContext* io_ctx, + OlapReaderStatistics* stats, InvertedIndexQueryCacheHandle* cache_handle, lucene::store::Directory* dir) { SCOPED_RAW_TIMER(&stats->inverted_index_query_null_bitmap_timer); @@ -120,9 +121,7 @@ Status InvertedIndexReader::read_null_bitmap(OlapReaderStatistics* stats, if (!dir) { // TODO: ugly code here, try to refact. - bool open_idx_file_cache = true; - auto st = _inverted_index_file_reader->init(config::inverted_index_read_buffer_size, - open_idx_file_cache); + auto st = _inverted_index_file_reader->init(config::inverted_index_read_buffer_size); if (!st.ok()) { LOG(WARNING) << st; return st; @@ -138,6 +137,7 @@ Status InvertedIndexReader::read_null_bitmap(OlapReaderStatistics* stats, InvertedIndexDescriptor::get_temporary_null_bitmap_file_name(); if (dir->fileExists(null_bitmap_file_name)) { null_bitmap_in = dir->openInput(null_bitmap_file_name); + null_bitmap_in->setIoContext(io_ctx); size_t null_bitmap_size = null_bitmap_in->length(); faststring buf; buf.resize(null_bitmap_size); @@ -165,7 +165,8 @@ Status InvertedIndexReader::read_null_bitmap(OlapReaderStatistics* stats, } Status InvertedIndexReader::handle_searcher_cache( - InvertedIndexCacheHandle* inverted_index_cache_handle, OlapReaderStatistics* stats) { + InvertedIndexCacheHandle* inverted_index_cache_handle, const io::IOContext* io_ctx, + OlapReaderStatistics* stats) { auto index_file_key = _inverted_index_file_reader->get_index_file_cache_key(&_index_meta); InvertedIndexSearcherCache::CacheKey searcher_cache_key(index_file_key); if (InvertedIndexSearcherCache::instance()->lookup(searcher_cache_key, @@ -179,9 +180,7 @@ Status InvertedIndexReader::handle_searcher_cache( SCOPED_RAW_TIMER(&stats->inverted_index_searcher_open_timer); IndexSearcherPtr searcher; - bool open_idx_file_cache = true; - auto st = _inverted_index_file_reader->init(config::inverted_index_read_buffer_size, - open_idx_file_cache); + auto st = _inverted_index_file_reader->init(config::inverted_index_read_buffer_size); if (!st.ok()) { LOG(WARNING) << st; return st; @@ -191,7 +190,7 @@ Status InvertedIndexReader::handle_searcher_cache( // to avoid open directory additionally for null_bitmap // TODO: handle null bitmap procedure in new format. InvertedIndexQueryCacheHandle null_bitmap_cache_handle; - static_cast<void>(read_null_bitmap(stats, &null_bitmap_cache_handle, dir.get())); + static_cast<void>(read_null_bitmap(io_ctx, stats, &null_bitmap_cache_handle, dir.get())); RETURN_IF_ERROR(create_index_searcher(dir.release(), &searcher, mem_tracker.get(), type())); auto* cache_value = new InvertedIndexSearcherCache::CacheValue( std::move(searcher), mem_tracker->consumption(), UnixMillis()); @@ -211,22 +210,21 @@ Status InvertedIndexReader::create_index_searcher(lucene::store::Directory* dir, auto searcher_result = DORIS_TRY(index_searcher_builder->get_index_searcher(dir)); *searcher = searcher_result; - if (std::string(dir->getObjectName()) == "DorisCompoundReader") { - static_cast<DorisCompoundReader*>(dir)->getDorisIndexInput()->setIdxFileCache(false); - } + // NOTE: before mem_tracker hook becomes active, we caculate reader memory size by hand. mem_tracker->consume(index_searcher_builder->get_reader_size()); return Status::OK(); }; Status InvertedIndexReader::match_index_search( - OlapReaderStatistics* stats, RuntimeState* runtime_state, InvertedIndexQueryType query_type, - const InvertedIndexQueryInfo& query_info, const FulltextIndexSearcherPtr& index_searcher, + const io::IOContext* io_ctx, OlapReaderStatistics* stats, RuntimeState* runtime_state, + InvertedIndexQueryType query_type, const InvertedIndexQueryInfo& query_info, + const FulltextIndexSearcherPtr& index_searcher, const std::shared_ptr<roaring::Roaring>& term_match_bitmap) { TQueryOptions queryOptions = runtime_state->query_options(); try { SCOPED_RAW_TIMER(&stats->inverted_index_searcher_search_timer); - auto query = QueryFactory::create(query_type, index_searcher, queryOptions); + auto query = QueryFactory::create(query_type, index_searcher, queryOptions, io_ctx); if (!query) { return Status::Error<ErrorCode::INVERTED_INDEX_INVALID_PARAMETERS>( "query type " + query_type_to_string(query_type) + ", query is nullptr"); @@ -240,15 +238,17 @@ Status InvertedIndexReader::match_index_search( return Status::OK(); } -Status FullTextIndexReader::new_iterator(OlapReaderStatistics* stats, RuntimeState* runtime_state, +Status FullTextIndexReader::new_iterator(const io::IOContext& io_ctx, OlapReaderStatistics* stats, + RuntimeState* runtime_state, std::unique_ptr<InvertedIndexIterator>* iterator) { - *iterator = InvertedIndexIterator::create_unique(stats, runtime_state, shared_from_this()); + *iterator = + InvertedIndexIterator::create_unique(io_ctx, stats, runtime_state, shared_from_this()); return Status::OK(); } -Status FullTextIndexReader::query(OlapReaderStatistics* stats, RuntimeState* runtime_state, - const std::string& column_name, const void* query_value, - InvertedIndexQueryType query_type, +Status FullTextIndexReader::query(const io::IOContext* io_ctx, OlapReaderStatistics* stats, + RuntimeState* runtime_state, const std::string& column_name, + const void* query_value, InvertedIndexQueryType query_type, std::shared_ptr<roaring::Roaring>& bit_map) { SCOPED_RAW_TIMER(&stats->inverted_index_query_timer); @@ -325,12 +325,12 @@ Status FullTextIndexReader::query(OlapReaderStatistics* stats, RuntimeState* run FulltextIndexSearcherPtr* searcher_ptr = nullptr; InvertedIndexCacheHandle inverted_index_cache_handle; - RETURN_IF_ERROR(handle_searcher_cache(&inverted_index_cache_handle, stats)); + RETURN_IF_ERROR(handle_searcher_cache(&inverted_index_cache_handle, io_ctx, stats)); auto searcher_variant = inverted_index_cache_handle.get_index_searcher(); searcher_ptr = std::get_if<FulltextIndexSearcherPtr>(&searcher_variant); if (searcher_ptr != nullptr) { term_match_bitmap = std::make_shared<roaring::Roaring>(); - RETURN_IF_ERROR(match_index_search(stats, runtime_state, query_type, query_info, + RETURN_IF_ERROR(match_index_search(io_ctx, stats, runtime_state, query_type, query_info, *searcher_ptr, term_match_bitmap)); term_match_bitmap->runOptimize(); cache->insert(cache_key, term_match_bitmap, &cache_handler); @@ -348,13 +348,15 @@ InvertedIndexReaderType FullTextIndexReader::type() { } Status StringTypeInvertedIndexReader::new_iterator( - OlapReaderStatistics* stats, RuntimeState* runtime_state, + const io::IOContext& io_ctx, OlapReaderStatistics* stats, RuntimeState* runtime_state, std::unique_ptr<InvertedIndexIterator>* iterator) { - *iterator = InvertedIndexIterator::create_unique(stats, runtime_state, shared_from_this()); + *iterator = + InvertedIndexIterator::create_unique(io_ctx, stats, runtime_state, shared_from_this()); return Status::OK(); } -Status StringTypeInvertedIndexReader::query(OlapReaderStatistics* stats, +Status StringTypeInvertedIndexReader::query(const io::IOContext* io_ctx, + OlapReaderStatistics* stats, RuntimeState* runtime_state, const std::string& column_name, const void* query_value, InvertedIndexQueryType query_type, @@ -398,7 +400,7 @@ Status StringTypeInvertedIndexReader::query(OlapReaderStatistics* stats, auto result = std::make_shared<roaring::Roaring>(); FulltextIndexSearcherPtr* searcher_ptr = nullptr; InvertedIndexCacheHandle inverted_index_cache_handle; - RETURN_IF_ERROR(handle_searcher_cache(&inverted_index_cache_handle, stats)); + RETURN_IF_ERROR(handle_searcher_cache(&inverted_index_cache_handle, io_ctx, stats)); auto searcher_variant = inverted_index_cache_handle.get_index_searcher(); searcher_ptr = std::get_if<FulltextIndexSearcherPtr>(&searcher_variant); if (searcher_ptr != nullptr) { @@ -407,7 +409,7 @@ Status StringTypeInvertedIndexReader::query(OlapReaderStatistics* stats, case InvertedIndexQueryType::MATCH_ANY_QUERY: case InvertedIndexQueryType::MATCH_ALL_QUERY: case InvertedIndexQueryType::EQUAL_QUERY: { - RETURN_IF_ERROR(match_index_search(stats, runtime_state, + RETURN_IF_ERROR(match_index_search(io_ctx, stats, runtime_state, InvertedIndexQueryType::MATCH_ANY_QUERY, query_info, *searcher_ptr, result)); break; @@ -415,8 +417,8 @@ Status StringTypeInvertedIndexReader::query(OlapReaderStatistics* stats, case InvertedIndexQueryType::MATCH_PHRASE_QUERY: case InvertedIndexQueryType::MATCH_PHRASE_PREFIX_QUERY: case InvertedIndexQueryType::MATCH_REGEXP_QUERY: { - RETURN_IF_ERROR(match_index_search(stats, runtime_state, query_type, query_info, - *searcher_ptr, result)); + RETURN_IF_ERROR(match_index_search(io_ctx, stats, runtime_state, query_type, + query_info, *searcher_ptr, result)); break; } case InvertedIndexQueryType::LESS_THAN_QUERY: @@ -481,9 +483,11 @@ InvertedIndexReaderType StringTypeInvertedIndexReader::type() { return InvertedIndexReaderType::STRING_TYPE; } -Status BkdIndexReader::new_iterator(OlapReaderStatistics* stats, RuntimeState* runtime_state, +Status BkdIndexReader::new_iterator(const io::IOContext& io_ctx, OlapReaderStatistics* stats, + RuntimeState* runtime_state, std::unique_ptr<InvertedIndexIterator>* iterator) { - *iterator = InvertedIndexIterator::create_unique(stats, runtime_state, shared_from_this()); + *iterator = + InvertedIndexIterator::create_unique(io_ctx, stats, runtime_state, shared_from_this()); return Status::OK(); } @@ -611,12 +615,12 @@ Status BkdIndexReader::invoke_bkd_query(const void* query_value, InvertedIndexQu return Status::OK(); } -Status BkdIndexReader::try_query(OlapReaderStatistics* stats, const std::string& column_name, - const void* query_value, InvertedIndexQueryType query_type, - uint32_t* count) { +Status BkdIndexReader::try_query(const io::IOContext* io_ctx, OlapReaderStatistics* stats, + const std::string& column_name, const void* query_value, + InvertedIndexQueryType query_type, uint32_t* count) { try { std::shared_ptr<lucene::util::bkd::bkd_reader> r; - auto st = get_bkd_reader(r, stats); + auto st = get_bkd_reader(r, io_ctx, stats); if (!st.ok()) { LOG(WARNING) << "get bkd reader for " << _inverted_index_file_reader->get_index_file_path(&_index_meta) @@ -648,15 +652,15 @@ Status BkdIndexReader::try_query(OlapReaderStatistics* stats, const std::string& return Status::OK(); } -Status BkdIndexReader::query(OlapReaderStatistics* stats, RuntimeState* runtime_state, - const std::string& column_name, const void* query_value, - InvertedIndexQueryType query_type, +Status BkdIndexReader::query(const io::IOContext* io_ctx, OlapReaderStatistics* stats, + RuntimeState* runtime_state, const std::string& column_name, + const void* query_value, InvertedIndexQueryType query_type, std::shared_ptr<roaring::Roaring>& bit_map) { SCOPED_RAW_TIMER(&stats->inverted_index_query_timer); try { std::shared_ptr<lucene::util::bkd::bkd_reader> r; - auto st = get_bkd_reader(r, stats); + auto st = get_bkd_reader(r, io_ctx, stats); if (!st.ok()) { LOG(WARNING) << "get bkd reader for " << _inverted_index_file_reader->get_index_file_path(&_index_meta) @@ -692,11 +696,11 @@ Status BkdIndexReader::query(OlapReaderStatistics* stats, RuntimeState* runtime_ } } -Status BkdIndexReader::get_bkd_reader(BKDIndexSearcherPtr& bkd_reader, +Status BkdIndexReader::get_bkd_reader(BKDIndexSearcherPtr& bkd_reader, const io::IOContext* io_ctx, OlapReaderStatistics* stats) { BKDIndexSearcherPtr* bkd_searcher = nullptr; InvertedIndexCacheHandle inverted_index_cache_handle; - RETURN_IF_ERROR(handle_searcher_cache(&inverted_index_cache_handle, stats)); + RETURN_IF_ERROR(handle_searcher_cache(&inverted_index_cache_handle, io_ctx, stats)); auto searcher_variant = inverted_index_cache_handle.get_index_searcher(); bkd_searcher = std::get_if<BKDIndexSearcherPtr>(&searcher_variant); if (bkd_searcher) { @@ -1126,8 +1130,8 @@ Status InvertedIndexIterator::read_from_inverted_index( } } - RETURN_IF_ERROR( - _reader->query(_stats, _runtime_state, column_name, query_value, query_type, bit_map)); + RETURN_IF_ERROR(_reader->query(&_io_ctx, _stats, _runtime_state, column_name, query_value, + query_type, bit_map)); return Status::OK(); } @@ -1141,7 +1145,8 @@ Status InvertedIndexIterator::try_read_from_inverted_index(const std::string& co query_type == InvertedIndexQueryType::LESS_EQUAL_QUERY || query_type == InvertedIndexQueryType::LESS_THAN_QUERY || query_type == InvertedIndexQueryType::EQUAL_QUERY) { - RETURN_IF_ERROR(_reader->try_query(_stats, column_name, query_value, query_type, count)); + RETURN_IF_ERROR( + _reader->try_query(&_io_ctx, _stats, column_name, query_value, query_type, count)); } return Status::OK(); } @@ -1159,4 +1164,5 @@ template class InvertedIndexVisitor<InvertedIndexQueryType::EQUAL_QUERY>; template class InvertedIndexVisitor<InvertedIndexQueryType::LESS_EQUAL_QUERY>; template class InvertedIndexVisitor<InvertedIndexQueryType::GREATER_THAN_QUERY>; template class InvertedIndexVisitor<InvertedIndexQueryType::GREATER_EQUAL_QUERY>; + } // namespace doris::segment_v2 diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.h b/be/src/olap/rowset/segment_v2/inverted_index_reader.h index f37516bc40f..57975ab7fec 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_reader.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.h @@ -181,17 +181,18 @@ public: virtual ~InvertedIndexReader() = default; // create a new column iterator. Client should delete returned iterator - virtual Status new_iterator(OlapReaderStatistics* stats, RuntimeState* runtime_state, + virtual Status new_iterator(const io::IOContext& io_ctx, OlapReaderStatistics* stats, + RuntimeState* runtime_state, std::unique_ptr<InvertedIndexIterator>* iterator) = 0; - virtual Status query(OlapReaderStatistics* stats, RuntimeState* runtime_state, - const std::string& column_name, const void* query_value, - InvertedIndexQueryType query_type, + virtual Status query(const io::IOContext* io_ctx, OlapReaderStatistics* stats, + RuntimeState* runtime_state, const std::string& column_name, + const void* query_value, InvertedIndexQueryType query_type, std::shared_ptr<roaring::Roaring>& bit_map) = 0; - virtual Status try_query(OlapReaderStatistics* stats, const std::string& column_name, - const void* query_value, InvertedIndexQueryType query_type, - uint32_t* count) = 0; + virtual Status try_query(const io::IOContext* io_ctx, OlapReaderStatistics* stats, + const std::string& column_name, const void* query_value, + InvertedIndexQueryType query_type, uint32_t* count) = 0; - Status read_null_bitmap(OlapReaderStatistics* stats, + Status read_null_bitmap(const io::IOContext* io_ctx, OlapReaderStatistics* stats, InvertedIndexQueryCacheHandle* cache_handle, lucene::store::Directory* dir = nullptr); @@ -222,15 +223,15 @@ public: } virtual Status handle_searcher_cache(InvertedIndexCacheHandle* inverted_index_cache_handle, - OlapReaderStatistics* stats); + const io::IOContext* io_ctx, OlapReaderStatistics* stats); std::string get_index_file_path(); static Status create_index_searcher(lucene::store::Directory* dir, IndexSearcherPtr* searcher, MemTracker* mem_tracker, InvertedIndexReaderType reader_type); protected: - Status match_index_search(OlapReaderStatistics* stats, RuntimeState* runtime_state, - InvertedIndexQueryType query_type, + Status match_index_search(const io::IOContext* io_ctx, OlapReaderStatistics* stats, + RuntimeState* runtime_state, InvertedIndexQueryType query_type, const InvertedIndexQueryInfo& query_info, const FulltextIndexSearcherPtr& index_searcher, const std::shared_ptr<roaring::Roaring>& term_match_bitmap); @@ -252,15 +253,16 @@ public: : InvertedIndexReader(index_meta, inverted_index_file_reader) {} ~FullTextIndexReader() override = default; - Status new_iterator(OlapReaderStatistics* stats, RuntimeState* runtime_state, + Status new_iterator(const io::IOContext& io_ctx, OlapReaderStatistics* stats, + RuntimeState* runtime_state, std::unique_ptr<InvertedIndexIterator>* iterator) override; - Status query(OlapReaderStatistics* stats, RuntimeState* runtime_state, - const std::string& column_name, const void* query_value, - InvertedIndexQueryType query_type, + Status query(const io::IOContext* io_ctx, OlapReaderStatistics* stats, + RuntimeState* runtime_state, const std::string& column_name, + const void* query_value, InvertedIndexQueryType query_type, std::shared_ptr<roaring::Roaring>& bit_map) override; - Status try_query(OlapReaderStatistics* stats, const std::string& column_name, - const void* query_value, InvertedIndexQueryType query_type, - uint32_t* count) override { + Status try_query(const io::IOContext* io_ctx, OlapReaderStatistics* stats, + const std::string& column_name, const void* query_value, + InvertedIndexQueryType query_type, uint32_t* count) override { return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>( "FullTextIndexReader not support try_query"); } @@ -278,15 +280,16 @@ public: : InvertedIndexReader(index_meta, inverted_index_file_reader) {} ~StringTypeInvertedIndexReader() override = default; - Status new_iterator(OlapReaderStatistics* stats, RuntimeState* runtime_state, + Status new_iterator(const io::IOContext& io_ctx, OlapReaderStatistics* stats, + RuntimeState* runtime_state, std::unique_ptr<InvertedIndexIterator>* iterator) override; - Status query(OlapReaderStatistics* stats, RuntimeState* runtime_state, - const std::string& column_name, const void* query_value, - InvertedIndexQueryType query_type, + Status query(const io::IOContext* io_ctx, OlapReaderStatistics* stats, + RuntimeState* runtime_state, const std::string& column_name, + const void* query_value, InvertedIndexQueryType query_type, std::shared_ptr<roaring::Roaring>& bit_map) override; - Status try_query(OlapReaderStatistics* stats, const std::string& column_name, - const void* query_value, InvertedIndexQueryType query_type, - uint32_t* count) override { + Status try_query(const io::IOContext* io_ctx, OlapReaderStatistics* stats, + const std::string& column_name, const void* query_value, + InvertedIndexQueryType query_type, uint32_t* count) override { return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>( "StringTypeInvertedIndexReader not support try_query"); } @@ -337,16 +340,17 @@ public: : InvertedIndexReader(index_meta, inverted_index_file_reader) {} ~BkdIndexReader() override = default; - Status new_iterator(OlapReaderStatistics* stats, RuntimeState* runtime_state, + Status new_iterator(const io::IOContext& io_ctx, OlapReaderStatistics* stats, + RuntimeState* runtime_state, std::unique_ptr<InvertedIndexIterator>* iterator) override; - Status query(OlapReaderStatistics* stats, RuntimeState* runtime_state, - const std::string& column_name, const void* query_value, - InvertedIndexQueryType query_type, + Status query(const io::IOContext* io_ctx, OlapReaderStatistics* stats, + RuntimeState* runtime_state, const std::string& column_name, + const void* query_value, InvertedIndexQueryType query_type, std::shared_ptr<roaring::Roaring>& bit_map) override; - Status try_query(OlapReaderStatistics* stats, const std::string& column_name, - const void* query_value, InvertedIndexQueryType query_type, - uint32_t* count) override; + Status try_query(const io::IOContext* io_ctx, OlapReaderStatistics* stats, + const std::string& column_name, const void* query_value, + InvertedIndexQueryType query_type, uint32_t* count) override; Status invoke_bkd_try_query(const void* query_value, InvertedIndexQueryType query_type, std::shared_ptr<lucene::util::bkd::bkd_reader> r, uint32_t* count); Status invoke_bkd_query(const void* query_value, InvertedIndexQueryType query_type, @@ -358,7 +362,8 @@ public: InvertedIndexVisitor<QT>* visitor); InvertedIndexReaderType type() override; - Status get_bkd_reader(BKDIndexSearcherPtr& reader, OlapReaderStatistics* stats); + Status get_bkd_reader(BKDIndexSearcherPtr& reader, const io::IOContext* io_ctx, + OlapReaderStatistics* stats); private: const TypeInfo* _type_info {}; @@ -446,9 +451,12 @@ class InvertedIndexIterator { ENABLE_FACTORY_CREATOR(InvertedIndexIterator); public: - InvertedIndexIterator(OlapReaderStatistics* stats, RuntimeState* runtime_state, - std::shared_ptr<InvertedIndexReader> reader) - : _stats(stats), _runtime_state(runtime_state), _reader(std::move(reader)) {} + InvertedIndexIterator(const io::IOContext& io_ctx, OlapReaderStatistics* stats, + RuntimeState* runtime_state, std::shared_ptr<InvertedIndexReader> reader) + : _io_ctx(io_ctx), + _stats(stats), + _runtime_state(runtime_state), + _reader(std::move(reader)) {} Status read_from_inverted_index(const std::string& column_name, const void* query_value, InvertedIndexQueryType query_type, uint32_t segment_num_rows, @@ -459,7 +467,7 @@ public: Status read_null_bitmap(InvertedIndexQueryCacheHandle* cache_handle, lucene::store::Directory* dir = nullptr) { - return _reader->read_null_bitmap(_stats, cache_handle, dir); + return _reader->read_null_bitmap(&_io_ctx, _stats, cache_handle, dir); } [[nodiscard]] InvertedIndexReaderType get_inverted_index_reader_type() const; @@ -469,6 +477,7 @@ public: const InvertedIndexReaderPtr& reader() { return _reader; } private: + io::IOContext _io_ctx; OlapReaderStatistics* _stats = nullptr; RuntimeState* _runtime_state = nullptr; std::shared_ptr<InvertedIndexReader> _reader; diff --git a/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_test.cpp b/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_test.cpp index aed83201a63..0f1b27fd4fa 100644 --- a/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_test.cpp +++ b/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_test.cpp @@ -223,7 +223,7 @@ bool query_string(const TabletIndex* index, for (int i = 0; i < query_data.size(); i++) { TQueryOptions queryOptions; auto query = QueryFactory::create(InvertedIndexQueryType::EQUAL_QUERY, *string_searcher, - queryOptions); + queryOptions, nullptr); EXPECT_TRUE(query != nullptr); InvertedIndexQueryInfo query_info; query_info.field_name = column_name_ws; @@ -253,7 +253,7 @@ bool query_fulltext(const TabletIndex* index, for (int i = 0; i < query_data.size(); i++) { TQueryOptions queryOptions; auto query = QueryFactory::create(InvertedIndexQueryType::MATCH_ANY_QUERY, *string_searcher, - queryOptions); + queryOptions, nullptr); EXPECT_TRUE(query != nullptr); InvertedIndexQueryInfo query_info; query_info.field_name = column_name_ws; diff --git a/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_with_deleted_term.cpp b/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_with_deleted_term.cpp index 8b5d403fca4..a46f5f210df 100644 --- a/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_with_deleted_term.cpp +++ b/be/test/olap/rowset/segment_v2/inverted_index/compaction/index_compaction_with_deleted_term.cpp @@ -124,7 +124,7 @@ static bool query_string(const TabletIndex* index, for (int i = 0; i < query_data.size(); i++) { TQueryOptions queryOptions; auto query = QueryFactory::create(InvertedIndexQueryType::EQUAL_QUERY, *string_searcher, - queryOptions); + queryOptions, nullptr); EXPECT_TRUE(query != nullptr); InvertedIndexQueryInfo query_info; query_info.field_name = column_name_ws; @@ -155,7 +155,7 @@ static bool query_fulltext(const TabletIndex* index, for (int i = 0; i < query_data.size(); i++) { TQueryOptions queryOptions; auto query = QueryFactory::create(InvertedIndexQueryType::MATCH_ANY_QUERY, *string_searcher, - queryOptions); + queryOptions, nullptr); EXPECT_TRUE(query != nullptr); InvertedIndexQueryInfo query_info; query_info.field_name = column_name_ws; diff --git a/regression-test/data/fault_injection_p0/test_index_io_context.out b/regression-test/data/fault_injection_p0/test_index_io_context.out new file mode 100644 index 00000000000..3dc2880233e --- /dev/null +++ b/regression-test/data/fault_injection_p0/test_index_io_context.out @@ -0,0 +1,73 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +177 + +-- !sql -- +177 + +-- !sql -- +177 + +-- !sql -- +177 + +-- !sql -- +177 + +-- !sql -- +177 + +-- !sql -- +2 + +-- !sql -- +2 + +-- !sql -- +2 + +-- !sql -- +2 + +-- !sql -- +2 + +-- !sql -- +2 + +-- !sql -- +2 + +-- !sql -- +2 + +-- !sql -- +2 + +-- !sql -- +2 + +-- !sql -- +2 + +-- !sql -- +2 + +-- !sql -- +2 + +-- !sql -- +2 + +-- !sql -- +2 + +-- !sql -- +2 + +-- !sql -- +2 + +-- !sql -- +2 + diff --git a/regression-test/suites/fault_injection_p0/test_index_io_context.groovy b/regression-test/suites/fault_injection_p0/test_index_io_context.groovy new file mode 100644 index 00000000000..9e9a2674897 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_index_io_context.groovy @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_index_io_context", "nonConcurrent") { + def tableName1 = "test_index_io_context1" + def tableName2 = "test_index_io_context2" + + def create_table = {table_name, index_format -> + sql """ DROP TABLE IF EXISTS ${table_name} """ + sql """ + CREATE TABLE IF NOT EXISTS ${table_name} ( + `@timestamp` int(11) NULL COMMENT "", + `clientip` varchar(20) NULL COMMENT "", + `request` text NULL COMMENT "", + `status` int(11) NULL COMMENT "", + `size` int(11) NULL COMMENT "", + INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '' + ) + DISTRIBUTED BY HASH(`@timestamp`) PROPERTIES( + "replication_num" = "1", + "disable_auto_compaction" = "true", + "inverted_index_storage_format" = "${index_format}" + ); + """ + } + + def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false, + expected_succ_rows = -1, load_to_single_tablet = 'true' -> + + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'label', label + "_" + UUID.randomUUID().toString() + set 'read_json_by_line', read_flag + set 'format', format_flag + file file_name // import json file + time 10000 // limit inflight 10s + if (expected_succ_rows >= 0) { + set 'max_filter_ratio', '1' + } + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (ignore_failure && expected_succ_rows < 0) { return } + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + } + } + } + + try { + create_table(tableName1, "V1"); + create_table(tableName2, "V2"); + + load_httplogs_data.call(tableName1, 'test_index_io_context1', 'true', 'json', 'documents-1000.json') + load_httplogs_data.call(tableName2, 'test_index_io_context2', 'true', 'json', 'documents-1000.json') + + sql "sync" + sql """ set enable_common_expr_pushdown = true; """ + + try { + GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexInput::readInternal") + + qt_sql """ select count() from ${tableName1} where request match_any 'ticket_quest_bg2.jpg'; """ + qt_sql """ select count() from ${tableName1} where request match_any 'ticket_quest_bg2.jpg'; """ + qt_sql """ select count() from ${tableName1} where request match_any 'ticket_quest_bg2.jpg'; """ + qt_sql """ select count() from ${tableName2} where request match_any 'ticket_quest_bg2.jpg'; """ + qt_sql """ select count() from ${tableName2} where request match_any 'ticket_quest_bg2.jpg'; """ + qt_sql """ select count() from ${tableName2} where request match_any 'ticket_quest_bg2.jpg'; """ + qt_sql """ select count() from ${tableName1} where request match_all 'ticket_quest_bg2.jpg'; """ + qt_sql """ select count() from ${tableName1} where request match_all 'ticket_quest_bg2.jpg'; """ + qt_sql """ select count() from ${tableName1} where request match_all 'ticket_quest_bg2.jpg'; """ + qt_sql """ select count() from ${tableName2} where request match_all 'ticket_quest_bg2.jpg'; """ + qt_sql """ select count() from ${tableName2} where request match_all 'ticket_quest_bg2.jpg'; """ + qt_sql """ select count() from ${tableName2} where request match_all 'ticket_quest_bg2.jpg'; """ + qt_sql """ select count() from ${tableName1} where request match_phrase 'ticket_quest_bg2.jpg'; """ + qt_sql """ select count() from ${tableName1} where request match_phrase 'ticket_quest_bg2.jpg'; """ + qt_sql """ select count() from ${tableName1} where request match_phrase 'ticket_quest_bg2.jpg'; """ + qt_sql """ select count() from ${tableName2} where request match_phrase 'ticket_quest_bg2.jpg'; """ + qt_sql """ select count() from ${tableName2} where request match_phrase 'ticket_quest_bg2.jpg'; """ + qt_sql """ select count() from ${tableName2} where request match_phrase 'ticket_quest_bg2.jpg'; """ + qt_sql """ select count() from ${tableName1} where request match_phrase 'ticket_quest_bg2.jpg ~10+'; """ + qt_sql """ select count() from ${tableName1} where request match_phrase 'ticket_quest_bg2.jpg ~10+'; """ + qt_sql """ select count() from ${tableName1} where request match_phrase 'ticket_quest_bg2.jpg ~10+'; """ + qt_sql """ select count() from ${tableName2} where request match_phrase 'ticket_quest_bg2.jpg ~10+'; """ + qt_sql """ select count() from ${tableName2} where request match_phrase 'ticket_quest_bg2.jpg ~10+'; """ + qt_sql """ select count() from ${tableName2} where request match_phrase 'ticket_quest_bg2.jpg ~10+'; """ + } finally { + GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexInput::readInternal") + } + } finally { + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org