This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 8a166f8c352 branch-4.1: [codex] fix ANN OpenMP build budget and add 
concurrency test #61313 (#61652)
8a166f8c352 is described below

commit 8a166f8c3520a442438b9ac89560e11f172a3ade
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Mar 24 16:39:39 2026 +0800

    branch-4.1: [codex] fix ANN OpenMP build budget and add concurrency test 
#61313 (#61652)
    
    Cherry-picked from #61313
    
    Co-authored-by: zhiqiang <[email protected]>
---
 be/src/storage/index/ann/faiss_ann_index.cpp       |  9 ++-
 .../storage/index/ann/faiss_vector_index_test.cpp  | 65 ++++++++++++++++++++++
 2 files changed, 73 insertions(+), 1 deletion(-)

diff --git a/be/src/storage/index/ann/faiss_ann_index.cpp 
b/be/src/storage/index/ann/faiss_ann_index.cpp
index 77b865d82bf..70a2ed01bc9 100644
--- a/be/src/storage/index/ann/faiss_ann_index.cpp
+++ b/be/src/storage/index/ann/faiss_ann_index.cpp
@@ -62,6 +62,7 @@ namespace doris::segment_v2 {
 namespace {
 
 std::mutex g_omp_thread_mutex;
+std::condition_variable g_omp_thread_cv;
 int g_index_threads_in_use = 0;
 
 // Guard that ensures the total OpenMP threads used by concurrent index builds
@@ -71,7 +72,11 @@ public:
     // For each index build, reserve at most half of the remaining threads, at 
least 1 thread.
     ScopedOmpThreadBudget() {
         std::unique_lock<std::mutex> lock(g_omp_thread_mutex);
-        auto thread_cap = config::omp_threads_limit - g_index_threads_in_use;
+        auto omp_threads_limit = get_omp_threads_limit();
+        // Block until there is at least one OpenMP slot available under the 
global cap.
+        g_omp_thread_cv.wait(lock, [&] { return g_index_threads_in_use < 
omp_threads_limit; });
+        auto thread_cap = omp_threads_limit - g_index_threads_in_use;
+        // Keep headroom for other concurrent index builds: take up to half of 
remaining budget.
         _reserved_threads = std::max(1, thread_cap / 2);
         g_index_threads_in_use += _reserved_threads;
         
DorisMetrics::instance()->ann_index_build_index_threads->increment(_reserved_threads);
@@ -88,6 +93,8 @@ public:
         if (g_index_threads_in_use < 0) {
             g_index_threads_in_use = 0;
         }
+        // Wake waiting index builders so they can compete for the released 
OpenMP budget.
+        g_omp_thread_cv.notify_all();
         VLOG_DEBUG << fmt::format(
                 "ScopedOmpThreadBudget release threads reserved={}, 
remaining_in_use={}, limit={}",
                 _reserved_threads, g_index_threads_in_use, 
get_omp_threads_limit());
diff --git a/be/test/storage/index/ann/faiss_vector_index_test.cpp 
b/be/test/storage/index/ann/faiss_vector_index_test.cpp
index 9b4016bbcb8..d9a61adcaeb 100644
--- a/be/test/storage/index/ann/faiss_vector_index_test.cpp
+++ b/be/test/storage/index/ann/faiss_vector_index_test.cpp
@@ -21,18 +21,24 @@
 #include <gtest/gtest.h>
 
 #include <algorithm>
+#include <atomic>
+#include <chrono>
 #include <cstddef>
 #include <limits>
 #include <memory>
 #include <random>
 #include <string>
+#include <thread>
 #include <vector>
 
+#include "common/config.h"
+#include "common/metrics/doris_metrics.h"
 #include "storage/index/ann/ann_index.h"
 #include "storage/index/ann/ann_search_params.h"
 #include "storage/index/ann/faiss_ann_index.h"
 // metrics.h not used directly here
 #include "storage/index/ann/vector_search_utils.h"
+#include "util/defer_op.h"
 
 using namespace doris::segment_v2;
 
@@ -233,6 +239,65 @@ TEST_F(VectorSearchTest, UpdateRoaring) {
     }
 }
 
+TEST_F(VectorSearchTest, OmpThreadBudgetNeverExceedsLimit) {
+    constexpr int kWorkers = 2;
+    constexpr int kDim = 64;
+    // Keep this workload small to avoid long-running BE UT under ASAN.
+    constexpr int kNumVectors = 500;
+
+    const auto old_omp_threads_limit = config::omp_threads_limit;
+    config::omp_threads_limit = 1;
+    Defer reset_omp_threads_limit(
+            [&old_omp_threads_limit]() { config::omp_threads_limit = 
old_omp_threads_limit; });
+
+    auto* budget_metric = 
DorisMetrics::instance()->ann_index_build_index_threads;
+    std::atomic<bool> start {false};
+    std::atomic<int> finished {0};
+    std::vector<std::thread> workers;
+    workers.reserve(kWorkers);
+
+    for (int worker_id = 0; worker_id < kWorkers; ++worker_id) {
+        workers.emplace_back([&start, &finished, worker_id]() {
+            auto index = std::make_unique<FaissVectorIndex>();
+            FaissBuildParameter params;
+            params.dim = kDim;
+            params.max_degree = 8;
+            params.ef_construction = 20;
+            params.index_type = FaissBuildParameter::IndexType::HNSW;
+            index->build(params);
+
+            std::vector<float> vectors(static_cast<size_t>(kNumVectors) * kDim,
+                                       static_cast<float>(worker_id + 1));
+            while (!start.load(std::memory_order_acquire)) {
+                std::this_thread::yield();
+            }
+
+            auto st = index->add(kNumVectors, vectors.data());
+            EXPECT_TRUE(st.ok()) << st.to_string();
+            finished.fetch_add(1, std::memory_order_acq_rel);
+        });
+    }
+
+    start.store(true, std::memory_order_release);
+
+    int64_t observed_peak = 0;
+    auto deadline = std::chrono::steady_clock::now() + 
std::chrono::seconds(20);
+    while (finished.load(std::memory_order_acquire) < kWorkers &&
+           std::chrono::steady_clock::now() < deadline) {
+        observed_peak = std::max<int64_t>(observed_peak, 
budget_metric->value());
+        std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+
+    for (auto& worker : workers) {
+        worker.join();
+    }
+
+    observed_peak = std::max<int64_t>(observed_peak, budget_metric->value());
+    EXPECT_EQ(finished.load(std::memory_order_acquire), kWorkers);
+    EXPECT_LE(observed_peak, 1);
+    EXPECT_EQ(budget_metric->value(), 0);
+}
+
 TEST_F(VectorSearchTest, CompareResultWithNativeFaiss1) {
     const size_t iterations = 3;
     // Create random number generator


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to