This is an automated email from the ASF dual-hosted git repository.
airborne pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 26aacf428d2 [opt](ann index) omp_threads_limit working better & minor
change for observability (#56796)
26aacf428d2 is described below
commit 26aacf428d2dfa2bb2ca38309e3de0dd708057ab
Author: zhiqiang <[email protected]>
AuthorDate: Sat Oct 18 16:37:56 2025 +0800
[opt](ann index) omp_threads_limit working better & minor change for
observability (#56796)
This pull request introduces a new mechanism for managing OpenMP thread
usage during concurrent FAISS vector index builds in Doris, improving
resource control and stability. It adds a global thread budget guard to
ensure that the total number of threads used does not exceed a
configurable limit, and provides metrics for monitoring thread usage.
Additionally, thread naming is temporarily set for easier debugging
during index build phases.
---
be/src/common/config.cpp | 19 +++--
be/src/common/config.h | 3 +-
.../segment_v2/ann_index/faiss_ann_index.cpp | 90 ++++++++++++++++++++--
be/src/util/doris_metrics.cpp | 2 +
be/src/util/doris_metrics.h | 1 +
5 files changed, 103 insertions(+), 12 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index c2ce46bb06e..32683dc132e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1585,11 +1585,20 @@
DEFINE_mBool(enable_auto_clone_on_mow_publish_missing_version, "false");
// The maximum csv line reader output buffer size
DEFINE_mInt64(max_csv_line_reader_output_buffer_size, "4294967296");
-// Maximum number of openmp threads can be used by each doris threads.
-// This configuration controls the parallelism level for OpenMP operations
within Doris,
-// helping to prevent resource contention and ensure stable performance when
multiple
-// Doris threads are executing OpenMP-accelerated operations simultaneously.
-DEFINE_mInt32(omp_threads_limit, "8");
+// Maximum number of OpenMP threads allowed for concurrent vector index builds.
+// -1 means auto: use 80% of the available CPU cores.
+DEFINE_Int32(omp_threads_limit, "-1");
+DEFINE_Validator(omp_threads_limit, [](const int config) -> bool {
+ CpuInfo::init();
+ int core_cap = config::num_cores > 0 ? config::num_cores :
CpuInfo::num_cores();
+ core_cap = std::max(1, core_cap);
+ int limit = config;
+ if (limit < 0) {
+ limit = std::max(1, core_cap * 4 / 5);
+ }
+ omp_threads_limit = std::max(1, std::min(limit, core_cap));
+ return true;
+});
// The capacity of segment partial column cache, used to cache column readers
for each segment.
DEFINE_mInt32(max_segment_partial_column_cache_size, "100");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index df83ae46909..afba63bebe5 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1642,7 +1642,8 @@ DECLARE_String(fuzzy_test_type);
// The maximum csv line reader output buffer size
DECLARE_mInt64(max_csv_line_reader_output_buffer_size);
-// Maximum number of OpenMP threads that can be used by each Doris thread
+// Maximum number of OpenMP threads available for concurrent index builds.
+// -1 means auto: use 80% of detected CPU cores.
DECLARE_Int32(omp_threads_limit);
// The capacity of segment partial column cache, used to cache column readers
for each segment.
DECLARE_mInt32(max_segment_partial_column_cache_size);
diff --git a/be/src/olap/rowset/segment_v2/ann_index/faiss_ann_index.cpp
b/be/src/olap/rowset/segment_v2/ann_index/faiss_ann_index.cpp
index aacbd183beb..7c9d9bb22d0 100644
--- a/be/src/olap/rowset/segment_v2/ann_index/faiss_ann_index.cpp
+++ b/be/src/olap/rowset/segment_v2/ann_index/faiss_ann_index.cpp
@@ -19,12 +19,16 @@
#include <faiss/index_io.h>
#include <omp.h>
+#include <pthread.h>
+#include <algorithm>
#include <cmath>
+#include <condition_variable>
#include <cstddef>
#include <cstdint>
#include <limits>
#include <memory>
+#include <mutex>
#include <string>
#include "CLucene/store/IndexInput.h"
@@ -42,11 +46,77 @@
#include "olap/rowset/segment_v2/ann_index/ann_index_files.h"
#include "olap/rowset/segment_v2/ann_index/ann_search_params.h"
#include "util/doris_metrics.h"
+#include "util/thread.h"
#include "util/time.h"
#include "vec/core/types.h"
namespace doris::segment_v2 {
#include "common/compile_check_begin.h"
+
+namespace {
+
+std::mutex g_omp_thread_mutex;
+int g_index_threads_in_use = 0;
+
+// Guard that ensures the total OpenMP threads used by concurrent index builds
+// never exceed the configured omp_threads_limit.
+class ScopedOmpThreadBudget {
+public:
+ // For each index build, reserve at most half of the remaining threads, at
least 1 thread.
+ ScopedOmpThreadBudget() {
+ std::unique_lock<std::mutex> lock(g_omp_thread_mutex);
+ auto thread_cap = config::omp_threads_limit - g_index_threads_in_use;
+ _reserved_threads = std::max(1, thread_cap / 2);
+ g_index_threads_in_use += _reserved_threads;
+
DorisMetrics::instance()->ann_index_build_index_threads->increment(_reserved_threads);
+ omp_set_num_threads(_reserved_threads);
+ VLOG_DEBUG << fmt::format(
+ "ScopedOmpThreadBudget reserve threads reserved={}, in_use={},
limit={}",
+ _reserved_threads, g_index_threads_in_use,
config::omp_threads_limit);
+ }
+
+ ~ScopedOmpThreadBudget() {
+ std::lock_guard<std::mutex> lock(g_omp_thread_mutex);
+ g_index_threads_in_use -= _reserved_threads;
+
DorisMetrics::instance()->ann_index_build_index_threads->increment(-_reserved_threads);
+ if (g_index_threads_in_use < 0) {
+ g_index_threads_in_use = 0;
+ }
+ VLOG_DEBUG << fmt::format(
+ "ScopedOmpThreadBudget release threads reserved={},
remaining_in_use={}, limit={}",
+ _reserved_threads, g_index_threads_in_use,
config::omp_threads_limit);
+ }
+
+private:
+ int _reserved_threads = 1;
+};
+
+// Temporarily rename the current thread so FAISS build phases are easier to
spot in debuggers.
+class ScopedThreadName {
+public:
+ explicit ScopedThreadName(const std::string& new_name) {
+ // POSIX limits thread names to 15 visible chars plus the null
terminator.
+ char current_name[16] = {0};
+ int ret = pthread_getname_np(pthread_self(), current_name,
sizeof(current_name));
+ if (ret == 0) {
+ _has_previous_name = true;
+ _previous_name = current_name;
+ }
+ Thread::set_self_name(new_name);
+ }
+
+ ~ScopedThreadName() {
+ if (_has_previous_name) {
+ Thread::set_self_name(_previous_name);
+ }
+ }
+
+private:
+ bool _has_previous_name = false;
+ std::string _previous_name;
+};
+
+} // namespace
std::unique_ptr<faiss::IDSelector> FaissVectorIndex::roaring_to_faiss_selector(
const roaring::Roaring& roaring) {
std::vector<faiss::idx_t> ids;
@@ -154,7 +224,9 @@ public:
void FaissVectorIndex::train(vectorized::Int64 n, const float* x) {
DCHECK(x != nullptr);
DCHECK(_index != nullptr);
- omp_set_num_threads(config::omp_threads_limit);
+ ScopedThreadName scoped_name("faiss_train_idx");
+ // Reserve OpenMP threads globally so concurrent builds stay under
omp_threads_limit.
+ ScopedOmpThreadBudget thread_budget;
_index->train(n, x);
}
@@ -169,11 +241,17 @@ void FaissVectorIndex::train(vectorized::Int64 n, const
float* x) {
doris::Status FaissVectorIndex::add(vectorized::Int64 n, const float* vec) {
DCHECK(vec != nullptr);
DCHECK(_index != nullptr);
- omp_set_num_threads(config::omp_threads_limit);
- DorisMetrics::instance()->ann_index_construction->increment(1);
- _index->add(n, vec);
- DorisMetrics::instance()->ann_index_construction->increment(-1);
- DorisMetrics::instance()->ann_index_in_memory_rows_cnt->increment(n);
+ ScopedThreadName scoped_name("faiss_build_idx");
+ // build index for every 1M rows, so that we can adjust thread usage
dynamically.
+ for (vectorized::Int64 i = 0; i < n; i += 1'000'000) {
+ // Apply the same thread budget when adding vectors to limit
concurrency.
+ ScopedOmpThreadBudget thread_budget;
+ DorisMetrics::instance()->ann_index_construction->increment(1);
+ vectorized::Int64 chunk_size = std::min(1'000'000L, n - i);
+ _index->add(chunk_size, vec + i * _dimension);
+
DorisMetrics::instance()->ann_index_in_memory_rows_cnt->increment(chunk_size);
+ DorisMetrics::instance()->ann_index_construction->increment(-1);
+ }
return doris::Status::OK();
}
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index c95a9ef02a0..0c3a713aab7 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -250,6 +250,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(ann_index_search_cnt,
MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(ann_index_in_memory_cnt,
MetricUnit::NOUNIT);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(ann_index_in_memory_rows_cnt,
MetricUnit::ROWS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(ann_index_construction,
MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(ann_index_build_index_threads,
MetricUnit::NOUNIT);
const std::string DorisMetrics::_s_registry_name = "doris_be";
const std::string DorisMetrics::_s_hook_name = "doris_metrics";
@@ -418,6 +419,7 @@ DorisMetrics::DorisMetrics() :
_metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
ann_index_in_memory_cnt);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
ann_index_in_memory_rows_cnt);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, ann_index_construction);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
ann_index_build_index_threads);
}
void DorisMetrics::initialize(bool init_system_metrics, const
std::set<std::string>& disk_devices,
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 906e8b9a7fa..6e1c3e1a30e 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -258,6 +258,7 @@ public:
IntCounter* ann_index_in_memory_cnt = nullptr;
IntCounter* ann_index_in_memory_rows_cnt = nullptr;
IntCounter* ann_index_construction = nullptr;
+ IntCounter* ann_index_build_index_threads = nullptr;
IntGauge* runtime_filter_consumer_num = nullptr;
IntGauge* runtime_filter_consumer_ready_num = nullptr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]