This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 4d341c06667 branch-4.0: [opt](inverted index) optimize S3 operations
for inverted index #59363 (#59511)
4d341c06667 is described below
commit 4d341c066671d9ed80ad4969079306fecd179d85
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jan 5 11:45:43 2026 +0800
branch-4.0: [opt](inverted index) optimize S3 operations for inverted index
#59363 (#59511)
Cherry-picked from #59363
Co-authored-by: zzzxl <[email protected]>
---
be/src/olap/collection_statistics.cpp | 68 +++++++++++++------
be/src/olap/collection_statistics.h | 12 +++-
be/src/pipeline/exec/olap_scan_operator.cpp | 1 +
be/src/pipeline/exec/olap_scan_operator.h | 1 +
be/src/vec/exec/scan/olap_scanner.cpp | 12 +++-
be/test/olap/collection_statistics_test.cpp | 31 +++++----
.../test_inverted_index_collection_stats.groovy | 77 ++++++++++++++++++++++
7 files changed, 166 insertions(+), 36 deletions(-)
diff --git a/be/src/olap/collection_statistics.cpp
b/be/src/olap/collection_statistics.cpp
index 9f0c07abe60..714a19fe6b7 100644
--- a/be/src/olap/collection_statistics.cpp
+++ b/be/src/olap/collection_statistics.cpp
@@ -17,6 +17,8 @@
#include "collection_statistics.h"
+#include <sstream>
+
#include "common/exception.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_reader.h"
@@ -35,21 +37,22 @@ namespace doris {
Status CollectionStatistics::collect(
RuntimeState* state, const std::vector<RowSetSplits>& rs_splits,
const TabletSchemaSPtr& tablet_schema,
- const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down) {
+ const vectorized::VExprContextSPtrs& common_expr_ctxs_push_down,
io::IOContext* io_ctx) {
std::unordered_map<std::wstring, CollectInfo> collect_infos;
RETURN_IF_ERROR(
extract_collect_info(state, common_expr_ctxs_push_down,
tablet_schema, &collect_infos));
+ if (collect_infos.empty()) {
+ LOG(WARNING) << "Index statistics collection: no collect info
extracted.";
+ return Status::OK();
+ }
for (const auto& rs_split : rs_splits) {
const auto& rs_reader = rs_split.rs_reader;
auto rowset = rs_reader->rowset();
- auto rowset_meta = rowset->rowset_meta();
-
auto num_segments = rowset->num_segments();
for (int32_t seg_id = 0; seg_id < num_segments; ++seg_id) {
- auto seg_path = DORIS_TRY(rowset->segment_path(seg_id));
- auto status = process_segment(seg_path, rowset_meta->fs(),
tablet_schema.get(),
- collect_infos);
+ auto status =
+ process_segment(rowset, seg_id, tablet_schema.get(),
collect_infos, io_ctx);
if (!status.ok()) {
if (status.code() == ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND
||
status.code() == ErrorCode::INVERTED_INDEX_BYPASS) {
@@ -62,15 +65,29 @@ Status CollectionStatistics::collect(
}
#ifndef NDEBUG
- LOG(INFO) << "term_num_docs: " << _total_num_docs;
+ std::stringstream ss;
+ ss << "term_num_docs: " << _total_num_docs;
for (const auto& [ws_field_name, num_tokens] : _total_num_tokens) {
- LOG(INFO) << "field_name: " << StringHelper::to_string(ws_field_name)
- << ", num_tokens: " << num_tokens;
- for (const auto& [term, doc_freq] : _term_doc_freqs.at(ws_field_name))
{
- LOG(INFO) << "term: " << StringHelper::to_string(term) << ",
doc_freq: " << doc_freq;
+ ss << ", [field_name: " << StringHelper::to_string(ws_field_name)
+ << ", num_tokens: " << num_tokens;
+ auto it = _term_doc_freqs.find(ws_field_name);
+ if (it != _term_doc_freqs.end()) {
+ ss << ", terms: {";
+ bool first = true;
+ for (const auto& [term, doc_freq] : it->second) {
+ if (!first) {
+ ss << ", ";
+ }
+ ss << StringHelper::to_string(term) << ": " << doc_freq;
+ first = false;
+ }
+ ss << "}";
+ } else {
+ ss << ", (no term stats)";
}
+ ss << "]";
}
- LOG(INFO) << "--------------------------------";
+ LOG(INFO) << "CollectionStatistics: " << ss.str();
#endif
return Status::OK();
@@ -136,6 +153,11 @@ Status handle_match_pred(RuntimeState* state, const
TabletSchemaSPtr& tablet_sch
auto term_infos = InvertedIndexAnalyzer::get_analyse_result(
right_literal->value(format_options),
index_meta->properties());
+ if (term_infos.empty()) {
+ LOG(WARNING) << "Index statistics collection: no terms extracted
from literal value, "
+ << "col_unique_id=" <<
index_meta->col_unique_ids()[0];
+ continue;
+ }
std::string field_name =
std::to_string(index_meta->col_unique_ids()[0]);
if (!column.suffix_path().empty()) {
@@ -188,18 +210,22 @@ Status CollectionStatistics::extract_collect_info(
}
Status CollectionStatistics::process_segment(
- const std::string& seg_path, const io::FileSystemSPtr& fs,
- const TabletSchema* tablet_schema,
- const std::unordered_map<std::wstring, CollectInfo>& collect_infos) {
+ const RowsetSharedPtr& rowset, int32_t seg_id, const TabletSchema*
tablet_schema,
+ const std::unordered_map<std::wstring, CollectInfo>& collect_infos,
io::IOContext* io_ctx) {
+ auto seg_path = DORIS_TRY(rowset->segment_path(seg_id));
+ auto rowset_meta = rowset->rowset_meta();
+
auto idx_file_reader = std::make_unique<IndexFileReader>(
- fs, std::string
{InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
- tablet_schema->get_inverted_index_storage_format());
- RETURN_IF_ERROR(idx_file_reader->init());
+ rowset_meta->fs(),
+ std::string
{InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
+ tablet_schema->get_inverted_index_storage_format(),
+ rowset_meta->inverted_index_file_info(seg_id));
+
RETURN_IF_ERROR(idx_file_reader->init(config::inverted_index_read_buffer_size,
io_ctx));
int32_t total_seg_num_docs = 0;
for (const auto& [ws_field_name, collect_info] : collect_infos) {
#ifdef BE_TEST
- auto compound_reader =
DORIS_TRY(idx_file_reader->open(collect_info.index_meta, nullptr));
+ auto compound_reader =
DORIS_TRY(idx_file_reader->open(collect_info.index_meta, io_ctx));
auto* reader = lucene::index::IndexReader::open(compound_reader.get());
auto index_searcher =
std::make_shared<lucene::search::IndexSearcher>(reader, true);
@@ -211,7 +237,7 @@ Status CollectionStatistics::process_segment(
if (!InvertedIndexSearcherCache::instance()->lookup(searcher_cache_key,
&inverted_index_cache_handle)) {
auto compound_reader =
- DORIS_TRY(idx_file_reader->open(collect_info.index_meta,
nullptr));
+ DORIS_TRY(idx_file_reader->open(collect_info.index_meta,
io_ctx));
auto* reader =
lucene::index::IndexReader::open(compound_reader.get());
size_t reader_size = reader->getTermInfosRAMUsed();
auto index_searcher =
std::make_shared<lucene::search::IndexSearcher>(reader, true);
@@ -231,7 +257,7 @@ Status CollectionStatistics::process_segment(
index_reader->sumTotalTermFreq(ws_field_name.c_str()).value_or(0);
for (const auto& term_info : collect_info.term_infos) {
- auto iter = TermIterator::create(nullptr, false, index_reader,
ws_field_name,
+ auto iter = TermIterator::create(io_ctx, false, index_reader,
ws_field_name,
term_info.get_single_term());
_term_doc_freqs[ws_field_name][iter->term()] += iter->doc_freq();
}
diff --git a/be/src/olap/collection_statistics.h
b/be/src/olap/collection_statistics.h
index 87a91c755a3..9fdd3ddde30 100644
--- a/be/src/olap/collection_statistics.h
+++ b/be/src/olap/collection_statistics.h
@@ -32,10 +32,14 @@ namespace doris {
namespace io {
class FileSystem;
using FileSystemSPtr = std::shared_ptr<FileSystem>;
+struct IOContext;
} // namespace io
struct RowSetSplits;
+class Rowset;
+using RowsetSharedPtr = std::shared_ptr<Rowset>;
+
class TabletIndex;
class TabletSchema;
using TabletSchemaSPtr = std::shared_ptr<TabletSchema>;
@@ -59,7 +63,8 @@ public:
Status collect(RuntimeState* state, const std::vector<RowSetSplits>&
rs_splits,
const TabletSchemaSPtr& tablet_schema,
- const vectorized::VExprContextSPtrs&
common_expr_ctxs_push_down);
+ const vectorized::VExprContextSPtrs&
common_expr_ctxs_push_down,
+ io::IOContext* io_ctx);
MOCK_FUNCTION float get_or_calculate_idf(const std::wstring&
lucene_col_name,
const std::wstring& term);
@@ -70,9 +75,10 @@ private:
const vectorized::VExprContextSPtrs&
common_expr_ctxs_push_down,
const TabletSchemaSPtr& tablet_schema,
std::unordered_map<std::wstring, CollectInfo>*
collect_infos);
- Status process_segment(const std::string& seg_path, const
io::FileSystemSPtr& fs,
+ Status process_segment(const RowsetSharedPtr& rowset, int32_t seg_id,
const TabletSchema* tablet_schema,
- const std::unordered_map<std::wstring,
CollectInfo>& collect_infos);
+ const std::unordered_map<std::wstring,
CollectInfo>& collect_infos,
+ io::IOContext* io_ctx);
uint64_t get_term_doc_freq_by_col(const std::wstring& lucene_col_name,
const std::wstring& term);
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 2cf9167d659..53b3d7e76d9 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -196,6 +196,7 @@ Status OlapScanLocalState::_init_profile() {
_total_pages_num_counter = ADD_COUNTER(_segment_profile, "TotalPagesNum",
TUnit::UNIT);
_cached_pages_num_counter = ADD_COUNTER(_segment_profile,
"CachedPagesNum", TUnit::UNIT);
+ _statistics_collect_timer = ADD_TIMER(_scanner_profile,
"StatisticsCollectTime");
_inverted_index_filter_counter =
ADD_COUNTER(_segment_profile, "RowsInvertedIndexFiltered",
TUnit::UNIT);
_inverted_index_filter_timer = ADD_TIMER(_segment_profile,
"InvertedIndexFilterTime");
diff --git a/be/src/pipeline/exec/olap_scan_operator.h
b/be/src/pipeline/exec/olap_scan_operator.h
index bc208028a48..c97f71a0113 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -188,6 +188,7 @@ private:
// used by segment v2
RuntimeProfile::Counter* _cached_pages_num_counter = nullptr;
+ RuntimeProfile::Counter* _statistics_collect_timer = nullptr;
RuntimeProfile::Counter* _inverted_index_filter_counter = nullptr;
RuntimeProfile::Counter* _inverted_index_filter_timer = nullptr;
RuntimeProfile::Counter* _inverted_index_query_null_bitmap_timer = nullptr;
diff --git a/be/src/vec/exec/scan/olap_scanner.cpp
b/be/src/vec/exec/scan/olap_scanner.cpp
index 3ecb1063f1c..fa808121a08 100644
--- a/be/src/vec/exec/scan/olap_scanner.cpp
+++ b/be/src/vec/exec/scan/olap_scanner.cpp
@@ -289,10 +289,20 @@ Status OlapScanner::prepare() {
}
if (_tablet_reader_params.score_runtime) {
+ SCOPED_TIMER(local_state->_statistics_collect_timer);
_tablet_reader_params.collection_statistics =
std::make_shared<CollectionStatistics>();
+
+ io::IOContext io_ctx {
+ .reader_type = ReaderType::READER_QUERY,
+ .expiration_time = tablet->ttl_seconds(),
+ .query_id = &_state->query_id(),
+ .file_cache_stats =
&_tablet_reader->mutable_stats()->file_cache_stats,
+ .is_inverted_index = true,
+ };
+
RETURN_IF_ERROR(_tablet_reader_params.collection_statistics->collect(
_state, _tablet_reader_params.rs_splits,
_tablet_reader_params.tablet_schema,
- _tablet_reader_params.common_expr_ctxs_push_down));
+ _tablet_reader_params.common_expr_ctxs_push_down, &io_ctx));
}
_has_prepared = true;
diff --git a/be/test/olap/collection_statistics_test.cpp
b/be/test/olap/collection_statistics_test.cpp
index 7988f842161..c5b9b7054e7 100644
--- a/be/test/olap/collection_statistics_test.cpp
+++ b/be/test/olap/collection_statistics_test.cpp
@@ -327,7 +327,8 @@ TEST_F(CollectionStatisticsTest,
CollectWithEmptyRowsetSplits) {
std::vector<RowSetSplits> empty_splits;
- auto status = stats_->collect(runtime_state_.get(), empty_splits,
tablet_schema, expr_contexts);
+ auto status = stats_->collect(runtime_state_.get(), empty_splits,
tablet_schema, expr_contexts,
+ nullptr);
EXPECT_TRUE(status.ok()) << status.msg();
}
@@ -337,8 +338,8 @@ TEST_F(CollectionStatisticsTest,
CollectWithEmptyExpressions) {
std::vector<RowSetSplits> empty_splits;
- auto status =
- stats_->collect(runtime_state_.get(), empty_splits, tablet_schema,
empty_contexts);
+ auto status = stats_->collect(runtime_state_.get(), empty_splits,
tablet_schema, empty_contexts,
+ nullptr);
EXPECT_TRUE(status.ok()) << status.msg();
}
@@ -353,7 +354,8 @@ TEST_F(CollectionStatisticsTest,
CollectWithNonMatchExpression) {
std::vector<RowSetSplits> empty_splits;
- auto status = stats_->collect(runtime_state_.get(), empty_splits,
tablet_schema, contexts);
+ auto status =
+ stats_->collect(runtime_state_.get(), empty_splits, tablet_schema,
contexts, nullptr);
EXPECT_TRUE(status.ok()) << status.msg();
}
@@ -380,7 +382,8 @@ TEST_F(CollectionStatisticsTest,
CollectWithMultipleMatchExpressions) {
std::vector<RowSetSplits> empty_splits;
- auto status = stats_->collect(runtime_state_.get(), empty_splits,
tablet_schema, contexts);
+ auto status =
+ stats_->collect(runtime_state_.get(), empty_splits, tablet_schema,
contexts, nullptr);
EXPECT_TRUE(status.ok()) << status.msg();
}
@@ -407,7 +410,8 @@ TEST_F(CollectionStatisticsTest,
CollectWithNestedExpressions) {
std::vector<RowSetSplits> empty_splits;
- auto status = stats_->collect(runtime_state_.get(), empty_splits,
tablet_schema, contexts);
+ auto status =
+ stats_->collect(runtime_state_.get(), empty_splits, tablet_schema,
contexts, nullptr);
EXPECT_TRUE(status.ok()) << status.msg();
}
@@ -417,7 +421,8 @@ TEST_F(CollectionStatisticsTest,
CollectWithMockRowsetSplits) {
auto splits = create_mock_rowset_splits(2);
- auto status = stats_->collect(runtime_state_.get(), splits, tablet_schema,
expr_contexts);
+ auto status =
+ stats_->collect(runtime_state_.get(), splits, tablet_schema,
expr_contexts, nullptr);
EXPECT_TRUE(status.ok());
}
@@ -428,7 +433,8 @@ TEST_F(CollectionStatisticsTest, CollectWithEmptySegments) {
auto splits = create_mock_rowset_splits(0);
- auto status = stats_->collect(runtime_state_.get(), splits, tablet_schema,
expr_contexts);
+ auto status =
+ stats_->collect(runtime_state_.get(), splits, tablet_schema,
expr_contexts, nullptr);
EXPECT_TRUE(status.ok()) << status.msg();
}
@@ -450,7 +456,8 @@ TEST_F(CollectionStatisticsTest,
CollectWithMultipleRowsetSplits) {
splits.push_back(split);
}
- auto status = stats_->collect(runtime_state_.get(), splits, tablet_schema,
expr_contexts);
+ auto status =
+ stats_->collect(runtime_state_.get(), splits, tablet_schema,
expr_contexts, nullptr);
EXPECT_TRUE(status.ok()) << status.msg();
}
@@ -581,7 +588,8 @@ TEST_F(CollectionStatisticsTest,
CollectWithCastWrappedSlotRef) {
contexts.push_back(std::make_shared<vectorized::VExprContext>(match_expr));
std::vector<RowSetSplits> empty_splits;
- auto status = stats_->collect(runtime_state_.get(), empty_splits,
tablet_schema, contexts);
+ auto status =
+ stats_->collect(runtime_state_.get(), empty_splits, tablet_schema,
contexts, nullptr);
EXPECT_TRUE(status.ok()) << status.msg();
}
@@ -605,7 +613,8 @@ TEST_F(CollectionStatisticsTest,
CollectWithDoubleCastWrappedSlotRef) {
contexts.push_back(std::make_shared<vectorized::VExprContext>(match_expr));
std::vector<RowSetSplits> empty_splits;
- auto status = stats_->collect(runtime_state_.get(), empty_splits,
tablet_schema, contexts);
+ auto status =
+ stats_->collect(runtime_state_.get(), empty_splits, tablet_schema,
contexts, nullptr);
EXPECT_TRUE(status.ok()) << status.msg();
}
diff --git
a/regression-test/suites/inverted_index_p0/test_inverted_index_collection_stats.groovy
b/regression-test/suites/inverted_index_p0/test_inverted_index_collection_stats.groovy
new file mode 100644
index 00000000000..2b118436c07
--- /dev/null
+++
b/regression-test/suites/inverted_index_p0/test_inverted_index_collection_stats.groovy
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.regex.Pattern
+
+suite('test_inverted_index_collection_stats', 'p0') {
+ def indexTbName1 = "test_inverted_index_collection_stats_tbl"
+
+ sql "DROP TABLE IF EXISTS ${indexTbName1}"
+
+ sql """
+ CREATE TABLE ${indexTbName1} (
+ `id` int(11) NULL COMMENT "",
+ `content` text NULL COMMENT "",
+ INDEX content_idx (`content`) USING INVERTED PROPERTIES("parser" =
"english", "support_phrase" = "true") COMMENT ''
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY RANDOM BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql """ INSERT INTO ${indexTbName1} VALUES (1, 'hello world'), (2, 'hello
doris'), (3, 'doris is great') """
+
+ sql "sync"
+
+ // Enable profile
+ sql """ set enable_profile = true; """
+ sql """ set profile_level = 2; """
+ sql """ set enable_common_expr_pushdown = true; """
+ sql """ set enable_common_expr_pushdown_for_inverted_index = true; """
+
+ // Execute MATCH_ALL query which triggers CollectionStatistics::collect
+ def queryId =
"test_inverted_index_collection_stats_${System.currentTimeMillis()}"
+ try {
+ profile("${queryId}") {
+ run {
+ sql "/* ${queryId} */ select score() as score from
${indexTbName1} where content match_all 'hello' order by score desc limit 10"
+ }
+
+ check { profileString, exception ->
+ def statisticsCollectTime = 0
+ def matcher =
Pattern.compile("StatisticsCollectTime:\\s*(\\d+)").matcher(profileString)
+ if (matcher.find()) {
+ statisticsCollectTime = Integer.parseInt(matcher.group(1))
+ log.info("StatisticsCollectTime: {}",
statisticsCollectTime)
+ }
+ assertTrue(statisticsCollectTime > 0, "StatisticsCollectTime
should be > 0, got: ${statisticsCollectTime}")
+ }
+ }
+ } catch (Exception e) {
+ if (e.message?.contains("HttpCliAction failed")) {
+ log.warn("Profile HTTP request failed, skipping profile check:
{}", e.message)
+ } else {
+ log.warn("Profile check failed: {}", e.message)
+ throw e
+ }
+ } finally {
+ // sql "DROP TABLE IF EXISTS ${indexTbName1}"
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]