This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 4d341c06667 branch-4.0: [opt](inverted index) optimize S3 operations 
for inverted index #59363 (#59511)
4d341c06667 is described below

commit 4d341c066671d9ed80ad4969079306fecd179d85
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jan 5 11:45:43 2026 +0800

    branch-4.0: [opt](inverted index) optimize S3 operations for inverted index 
#59363 (#59511)
    
    Cherry-picked from #59363
    
    Co-authored-by: zzzxl <[email protected]>
---
 be/src/olap/collection_statistics.cpp              | 68 +++++++++++++------
 be/src/olap/collection_statistics.h                | 12 +++-
 be/src/pipeline/exec/olap_scan_operator.cpp        |  1 +
 be/src/pipeline/exec/olap_scan_operator.h          |  1 +
 be/src/vec/exec/scan/olap_scanner.cpp              | 12 +++-
 be/test/olap/collection_statistics_test.cpp        | 31 +++++----
 .../test_inverted_index_collection_stats.groovy    | 77 ++++++++++++++++++++++
 7 files changed, 166 insertions(+), 36 deletions(-)

diff --git a/be/src/olap/collection_statistics.cpp 
b/be/src/olap/collection_statistics.cpp
index 9f0c07abe60..714a19fe6b7 100644
--- a/be/src/olap/collection_statistics.cpp
+++ b/be/src/olap/collection_statistics.cpp
@@ -17,6 +17,8 @@
 
 #include "collection_statistics.h"
 
+#include <sstream>
+
 #include "common/exception.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_reader.h"
@@ -35,21 +37,22 @@ namespace doris {
 Status CollectionStatistics::collect(
         RuntimeState* state, const std::vector<RowSetSplits>& rs_splits,
         const TabletSchemaSPtr& tablet_schema,
-        const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down) {
+        const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down, 
io::IOContext* io_ctx) {
     std::unordered_map<std::wstring, CollectInfo> collect_infos;
     RETURN_IF_ERROR(
             extract_collect_info(state, common_expr_ctxs_push_down, 
tablet_schema, &collect_infos));
+    if (collect_infos.empty()) {
+        LOG(WARNING) << "Index statistics collection: no collect info 
extracted.";
+        return Status::OK();
+    }
 
     for (const auto& rs_split : rs_splits) {
         const auto& rs_reader = rs_split.rs_reader;
         auto rowset = rs_reader->rowset();
-        auto rowset_meta = rowset->rowset_meta();
-
         auto num_segments = rowset->num_segments();
         for (int32_t seg_id = 0; seg_id < num_segments; ++seg_id) {
-            auto seg_path = DORIS_TRY(rowset->segment_path(seg_id));
-            auto status = process_segment(seg_path, rowset_meta->fs(), 
tablet_schema.get(),
-                                          collect_infos);
+            auto status =
+                    process_segment(rowset, seg_id, tablet_schema.get(), 
collect_infos, io_ctx);
             if (!status.ok()) {
                 if (status.code() == ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND 
||
                     status.code() == ErrorCode::INVERTED_INDEX_BYPASS) {
@@ -62,15 +65,29 @@ Status CollectionStatistics::collect(
     }
 
 #ifndef NDEBUG
-    LOG(INFO) << "term_num_docs: " << _total_num_docs;
+    std::stringstream ss;
+    ss << "term_num_docs: " << _total_num_docs;
     for (const auto& [ws_field_name, num_tokens] : _total_num_tokens) {
-        LOG(INFO) << "field_name: " << StringHelper::to_string(ws_field_name)
-                  << ", num_tokens: " << num_tokens;
-        for (const auto& [term, doc_freq] : _term_doc_freqs.at(ws_field_name)) 
{
-            LOG(INFO) << "term: " << StringHelper::to_string(term) << ", 
doc_freq: " << doc_freq;
+        ss << ", [field_name: " << StringHelper::to_string(ws_field_name)
+           << ", num_tokens: " << num_tokens;
+        auto it = _term_doc_freqs.find(ws_field_name);
+        if (it != _term_doc_freqs.end()) {
+            ss << ", terms: {";
+            bool first = true;
+            for (const auto& [term, doc_freq] : it->second) {
+                if (!first) {
+                    ss << ", ";
+                }
+                ss << StringHelper::to_string(term) << ": " << doc_freq;
+                first = false;
+            }
+            ss << "}";
+        } else {
+            ss << ", (no term stats)";
         }
+        ss << "]";
     }
-    LOG(INFO) << "--------------------------------";
+    LOG(INFO) << "CollectionStatistics: " << ss.str();
 #endif
 
     return Status::OK();
@@ -136,6 +153,11 @@ Status handle_match_pred(RuntimeState* state, const 
TabletSchemaSPtr& tablet_sch
 
         auto term_infos = InvertedIndexAnalyzer::get_analyse_result(
                 right_literal->value(format_options), 
index_meta->properties());
+        if (term_infos.empty()) {
+            LOG(WARNING) << "Index statistics collection: no terms extracted 
from literal value, "
+                         << "col_unique_id=" << 
index_meta->col_unique_ids()[0];
+            continue;
+        }
 
         std::string field_name = 
std::to_string(index_meta->col_unique_ids()[0]);
         if (!column.suffix_path().empty()) {
@@ -188,18 +210,22 @@ Status CollectionStatistics::extract_collect_info(
 }
 
 Status CollectionStatistics::process_segment(
-        const std::string& seg_path, const io::FileSystemSPtr& fs,
-        const TabletSchema* tablet_schema,
-        const std::unordered_map<std::wstring, CollectInfo>& collect_infos) {
+        const RowsetSharedPtr& rowset, int32_t seg_id, const TabletSchema* 
tablet_schema,
+        const std::unordered_map<std::wstring, CollectInfo>& collect_infos, 
io::IOContext* io_ctx) {
+    auto seg_path = DORIS_TRY(rowset->segment_path(seg_id));
+    auto rowset_meta = rowset->rowset_meta();
+
     auto idx_file_reader = std::make_unique<IndexFileReader>(
-            fs, std::string 
{InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
-            tablet_schema->get_inverted_index_storage_format());
-    RETURN_IF_ERROR(idx_file_reader->init());
+            rowset_meta->fs(),
+            std::string 
{InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
+            tablet_schema->get_inverted_index_storage_format(),
+            rowset_meta->inverted_index_file_info(seg_id));
+    
RETURN_IF_ERROR(idx_file_reader->init(config::inverted_index_read_buffer_size, 
io_ctx));
 
     int32_t total_seg_num_docs = 0;
     for (const auto& [ws_field_name, collect_info] : collect_infos) {
 #ifdef BE_TEST
-        auto compound_reader = 
DORIS_TRY(idx_file_reader->open(collect_info.index_meta, nullptr));
+        auto compound_reader = 
DORIS_TRY(idx_file_reader->open(collect_info.index_meta, io_ctx));
         auto* reader = lucene::index::IndexReader::open(compound_reader.get());
         auto index_searcher = 
std::make_shared<lucene::search::IndexSearcher>(reader, true);
 
@@ -211,7 +237,7 @@ Status CollectionStatistics::process_segment(
         if (!InvertedIndexSearcherCache::instance()->lookup(searcher_cache_key,
                                                             
&inverted_index_cache_handle)) {
             auto compound_reader =
-                    DORIS_TRY(idx_file_reader->open(collect_info.index_meta, 
nullptr));
+                    DORIS_TRY(idx_file_reader->open(collect_info.index_meta, 
io_ctx));
             auto* reader = 
lucene::index::IndexReader::open(compound_reader.get());
             size_t reader_size = reader->getTermInfosRAMUsed();
             auto index_searcher = 
std::make_shared<lucene::search::IndexSearcher>(reader, true);
@@ -231,7 +257,7 @@ Status CollectionStatistics::process_segment(
                 
index_reader->sumTotalTermFreq(ws_field_name.c_str()).value_or(0);
 
         for (const auto& term_info : collect_info.term_infos) {
-            auto iter = TermIterator::create(nullptr, false, index_reader, 
ws_field_name,
+            auto iter = TermIterator::create(io_ctx, false, index_reader, 
ws_field_name,
                                              term_info.get_single_term());
             _term_doc_freqs[ws_field_name][iter->term()] += iter->doc_freq();
         }
diff --git a/be/src/olap/collection_statistics.h 
b/be/src/olap/collection_statistics.h
index 87a91c755a3..9fdd3ddde30 100644
--- a/be/src/olap/collection_statistics.h
+++ b/be/src/olap/collection_statistics.h
@@ -32,10 +32,14 @@ namespace doris {
 namespace io {
 class FileSystem;
 using FileSystemSPtr = std::shared_ptr<FileSystem>;
+struct IOContext;
 } // namespace io
 
 struct RowSetSplits;
 
+class Rowset;
+using RowsetSharedPtr = std::shared_ptr<Rowset>;
+
 class TabletIndex;
 class TabletSchema;
 using TabletSchemaSPtr = std::shared_ptr<TabletSchema>;
@@ -59,7 +63,8 @@ public:
 
     Status collect(RuntimeState* state, const std::vector<RowSetSplits>& 
rs_splits,
                    const TabletSchemaSPtr& tablet_schema,
-                   const vectorized::VExprContextSPtrs& 
common_expr_ctxs_push_down);
+                   const vectorized::VExprContextSPtrs& 
common_expr_ctxs_push_down,
+                   io::IOContext* io_ctx);
 
     MOCK_FUNCTION float get_or_calculate_idf(const std::wstring& 
lucene_col_name,
                                              const std::wstring& term);
@@ -70,9 +75,10 @@ private:
                                 const vectorized::VExprContextSPtrs& 
common_expr_ctxs_push_down,
                                 const TabletSchemaSPtr& tablet_schema,
                                 std::unordered_map<std::wstring, CollectInfo>* 
collect_infos);
-    Status process_segment(const std::string& seg_path, const 
io::FileSystemSPtr& fs,
+    Status process_segment(const RowsetSharedPtr& rowset, int32_t seg_id,
                            const TabletSchema* tablet_schema,
-                           const std::unordered_map<std::wstring, 
CollectInfo>& collect_infos);
+                           const std::unordered_map<std::wstring, 
CollectInfo>& collect_infos,
+                           io::IOContext* io_ctx);
 
     uint64_t get_term_doc_freq_by_col(const std::wstring& lucene_col_name,
                                       const std::wstring& term);
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 2cf9167d659..53b3d7e76d9 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -196,6 +196,7 @@ Status OlapScanLocalState::_init_profile() {
     _total_pages_num_counter = ADD_COUNTER(_segment_profile, "TotalPagesNum", 
TUnit::UNIT);
     _cached_pages_num_counter = ADD_COUNTER(_segment_profile, 
"CachedPagesNum", TUnit::UNIT);
 
+    _statistics_collect_timer = ADD_TIMER(_scanner_profile, 
"StatisticsCollectTime");
     _inverted_index_filter_counter =
             ADD_COUNTER(_segment_profile, "RowsInvertedIndexFiltered", 
TUnit::UNIT);
     _inverted_index_filter_timer = ADD_TIMER(_segment_profile, 
"InvertedIndexFilterTime");
diff --git a/be/src/pipeline/exec/olap_scan_operator.h 
b/be/src/pipeline/exec/olap_scan_operator.h
index bc208028a48..c97f71a0113 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -188,6 +188,7 @@ private:
     // used by segment v2
     RuntimeProfile::Counter* _cached_pages_num_counter = nullptr;
 
+    RuntimeProfile::Counter* _statistics_collect_timer = nullptr;
     RuntimeProfile::Counter* _inverted_index_filter_counter = nullptr;
     RuntimeProfile::Counter* _inverted_index_filter_timer = nullptr;
     RuntimeProfile::Counter* _inverted_index_query_null_bitmap_timer = nullptr;
diff --git a/be/src/vec/exec/scan/olap_scanner.cpp 
b/be/src/vec/exec/scan/olap_scanner.cpp
index 3ecb1063f1c..fa808121a08 100644
--- a/be/src/vec/exec/scan/olap_scanner.cpp
+++ b/be/src/vec/exec/scan/olap_scanner.cpp
@@ -289,10 +289,20 @@ Status OlapScanner::prepare() {
     }
 
     if (_tablet_reader_params.score_runtime) {
+        SCOPED_TIMER(local_state->_statistics_collect_timer);
         _tablet_reader_params.collection_statistics = 
std::make_shared<CollectionStatistics>();
+
+        io::IOContext io_ctx {
+                .reader_type = ReaderType::READER_QUERY,
+                .expiration_time = tablet->ttl_seconds(),
+                .query_id = &_state->query_id(),
+                .file_cache_stats = 
&_tablet_reader->mutable_stats()->file_cache_stats,
+                .is_inverted_index = true,
+        };
+
         RETURN_IF_ERROR(_tablet_reader_params.collection_statistics->collect(
                 _state, _tablet_reader_params.rs_splits, 
_tablet_reader_params.tablet_schema,
-                _tablet_reader_params.common_expr_ctxs_push_down));
+                _tablet_reader_params.common_expr_ctxs_push_down, &io_ctx));
     }
 
     _has_prepared = true;
diff --git a/be/test/olap/collection_statistics_test.cpp 
b/be/test/olap/collection_statistics_test.cpp
index 7988f842161..c5b9b7054e7 100644
--- a/be/test/olap/collection_statistics_test.cpp
+++ b/be/test/olap/collection_statistics_test.cpp
@@ -327,7 +327,8 @@ TEST_F(CollectionStatisticsTest, 
CollectWithEmptyRowsetSplits) {
 
     std::vector<RowSetSplits> empty_splits;
 
-    auto status = stats_->collect(runtime_state_.get(), empty_splits, 
tablet_schema, expr_contexts);
+    auto status = stats_->collect(runtime_state_.get(), empty_splits, 
tablet_schema, expr_contexts,
+                                  nullptr);
     EXPECT_TRUE(status.ok()) << status.msg();
 }
 
@@ -337,8 +338,8 @@ TEST_F(CollectionStatisticsTest, 
CollectWithEmptyExpressions) {
 
     std::vector<RowSetSplits> empty_splits;
 
-    auto status =
-            stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, 
empty_contexts);
+    auto status = stats_->collect(runtime_state_.get(), empty_splits, 
tablet_schema, empty_contexts,
+                                  nullptr);
     EXPECT_TRUE(status.ok()) << status.msg();
 }
 
@@ -353,7 +354,8 @@ TEST_F(CollectionStatisticsTest, 
CollectWithNonMatchExpression) {
 
     std::vector<RowSetSplits> empty_splits;
 
-    auto status = stats_->collect(runtime_state_.get(), empty_splits, 
tablet_schema, contexts);
+    auto status =
+            stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, 
contexts, nullptr);
     EXPECT_TRUE(status.ok()) << status.msg();
 }
 
@@ -380,7 +382,8 @@ TEST_F(CollectionStatisticsTest, 
CollectWithMultipleMatchExpressions) {
 
     std::vector<RowSetSplits> empty_splits;
 
-    auto status = stats_->collect(runtime_state_.get(), empty_splits, 
tablet_schema, contexts);
+    auto status =
+            stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, 
contexts, nullptr);
     EXPECT_TRUE(status.ok()) << status.msg();
 }
 
@@ -407,7 +410,8 @@ TEST_F(CollectionStatisticsTest, 
CollectWithNestedExpressions) {
 
     std::vector<RowSetSplits> empty_splits;
 
-    auto status = stats_->collect(runtime_state_.get(), empty_splits, 
tablet_schema, contexts);
+    auto status =
+            stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, 
contexts, nullptr);
     EXPECT_TRUE(status.ok()) << status.msg();
 }
 
@@ -417,7 +421,8 @@ TEST_F(CollectionStatisticsTest, 
CollectWithMockRowsetSplits) {
 
     auto splits = create_mock_rowset_splits(2);
 
-    auto status = stats_->collect(runtime_state_.get(), splits, tablet_schema, 
expr_contexts);
+    auto status =
+            stats_->collect(runtime_state_.get(), splits, tablet_schema, 
expr_contexts, nullptr);
 
     EXPECT_TRUE(status.ok());
 }
@@ -428,7 +433,8 @@ TEST_F(CollectionStatisticsTest, CollectWithEmptySegments) {
 
     auto splits = create_mock_rowset_splits(0);
 
-    auto status = stats_->collect(runtime_state_.get(), splits, tablet_schema, 
expr_contexts);
+    auto status =
+            stats_->collect(runtime_state_.get(), splits, tablet_schema, 
expr_contexts, nullptr);
     EXPECT_TRUE(status.ok()) << status.msg();
 }
 
@@ -450,7 +456,8 @@ TEST_F(CollectionStatisticsTest, 
CollectWithMultipleRowsetSplits) {
         splits.push_back(split);
     }
 
-    auto status = stats_->collect(runtime_state_.get(), splits, tablet_schema, 
expr_contexts);
+    auto status =
+            stats_->collect(runtime_state_.get(), splits, tablet_schema, 
expr_contexts, nullptr);
     EXPECT_TRUE(status.ok()) << status.msg();
 }
 
@@ -581,7 +588,8 @@ TEST_F(CollectionStatisticsTest, 
CollectWithCastWrappedSlotRef) {
     contexts.push_back(std::make_shared<vectorized::VExprContext>(match_expr));
 
     std::vector<RowSetSplits> empty_splits;
-    auto status = stats_->collect(runtime_state_.get(), empty_splits, 
tablet_schema, contexts);
+    auto status =
+            stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, 
contexts, nullptr);
     EXPECT_TRUE(status.ok()) << status.msg();
 }
 
@@ -605,7 +613,8 @@ TEST_F(CollectionStatisticsTest, 
CollectWithDoubleCastWrappedSlotRef) {
     contexts.push_back(std::make_shared<vectorized::VExprContext>(match_expr));
 
     std::vector<RowSetSplits> empty_splits;
-    auto status = stats_->collect(runtime_state_.get(), empty_splits, 
tablet_schema, contexts);
+    auto status =
+            stats_->collect(runtime_state_.get(), empty_splits, tablet_schema, 
contexts, nullptr);
     EXPECT_TRUE(status.ok()) << status.msg();
 }
 
diff --git 
a/regression-test/suites/inverted_index_p0/test_inverted_index_collection_stats.groovy
 
b/regression-test/suites/inverted_index_p0/test_inverted_index_collection_stats.groovy
new file mode 100644
index 00000000000..2b118436c07
--- /dev/null
+++ 
b/regression-test/suites/inverted_index_p0/test_inverted_index_collection_stats.groovy
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.regex.Pattern
+
+suite('test_inverted_index_collection_stats', 'p0') {
+    def indexTbName1 = "test_inverted_index_collection_stats_tbl"
+    
+    sql "DROP TABLE IF EXISTS ${indexTbName1}"
+    
+    sql """
+      CREATE TABLE ${indexTbName1} (
+      `id` int(11) NULL COMMENT "",
+      `content` text NULL COMMENT "",
+      INDEX content_idx (`content`) USING INVERTED PROPERTIES("parser" = 
"english", "support_phrase" = "true") COMMENT ''
+      ) ENGINE=OLAP
+      DUPLICATE KEY(`id`)
+      COMMENT "OLAP"
+      DISTRIBUTED BY RANDOM BUCKETS 1
+      PROPERTIES (
+      "replication_allocation" = "tag.location.default: 1"
+      );
+    """
+    
+    sql """ INSERT INTO ${indexTbName1} VALUES (1, 'hello world'), (2, 'hello 
doris'), (3, 'doris is great') """
+    
+    sql "sync"
+    
+    // Enable profile
+    sql """ set enable_profile = true; """
+    sql """ set profile_level = 2; """
+    sql """ set enable_common_expr_pushdown = true; """
+    sql """ set enable_common_expr_pushdown_for_inverted_index = true; """
+    
+    // Execute MATCH_ALL query which triggers CollectionStatistics::collect
+    def queryId = 
"test_inverted_index_collection_stats_${System.currentTimeMillis()}"
+    try {
+        profile("${queryId}") {
+            run {
+                sql "/* ${queryId} */ select score() as score from 
${indexTbName1} where content match_all 'hello' order by score desc limit 10"
+            }
+            
+            check { profileString, exception ->
+                def statisticsCollectTime = 0
+                def matcher = 
Pattern.compile("StatisticsCollectTime:\\s*(\\d+)").matcher(profileString)
+                if (matcher.find()) {
+                    statisticsCollectTime = Integer.parseInt(matcher.group(1))
+                    log.info("StatisticsCollectTime: {}", 
statisticsCollectTime)
+                }
+                assertTrue(statisticsCollectTime > 0, "StatisticsCollectTime 
should be > 0, got: ${statisticsCollectTime}")
+            }
+        }
+    } catch (Exception e) {
+        if (e.message?.contains("HttpCliAction failed")) {
+            log.warn("Profile HTTP request failed, skipping profile check: 
{}", e.message)
+        } else {
+            log.warn("Profile check failed: {}", e.message)
+            throw e
+        }
+    } finally {
+        // sql "DROP TABLE IF EXISTS ${indexTbName1}"
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to