This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch tpc_preview4-external
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 81b037280ccdd487c7820180c19747c73fbb4d67
Author: morningman <[email protected]>
AuthorDate: Wed Dec 31 17:01:38 2025 +0800

    add timed lock and PerScannerWaitSchedLockTime
---
 be/src/util/timed_lock.h                           | 297 +++++++++++++++++++++
 be/src/vec/exec/scan/scanner.h                     |   9 +
 be/src/vec/exec/scan/scanner_context.cpp           |  14 +
 be/src/vec/exec/scan/scanner_scheduler.h           |   2 +-
 be/src/vec/exec/scan/simplified_scan_scheduler.cpp |  24 +-
 5 files changed, 343 insertions(+), 3 deletions(-)

diff --git a/be/src/util/timed_lock.h b/be/src/util/timed_lock.h
new file mode 100644
index 00000000000..9b2b72ccb87
--- /dev/null
+++ b/be/src/util/timed_lock.h
@@ -0,0 +1,297 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <chrono>
+#include <cstdint>
+#include <mutex>
+#include <shared_mutex>
+
+namespace doris {
+
+// A RAII-style lock wrapper that can optionally record the time spent waiting 
for the lock.
+// Similar to std::unique_lock but with timing capabilities.
+//
+// Usage example:
+//   std::shared_mutex mutex;
+//   int64_t wait_time_ns = 0;
+//   {
+//       TimedLock<std::shared_mutex> lock(mutex, &wait_time_ns);
+//       // Critical section
+//   }
+//   // wait_time_ns now contains the time spent waiting for the lock in 
nanoseconds
+template <typename Mutex>
+class TimedLock {
+public:
+    using mutex_type = Mutex;
+
+    // Constructor that acquires the lock and optionally records wait time
+    // @param mutex: The mutex to lock
+    // @param wait_time_ns: Optional pointer to store the wait time in 
nanoseconds
+    explicit TimedLock(mutex_type& mutex, int64_t* wait_time_ns = nullptr)
+            : _mutex(&mutex), _owns_lock(false), _wait_time_ns(wait_time_ns) {
+        lock();
+    }
+
+    // Constructor with defer_lock - does not acquire the lock
+    TimedLock(mutex_type& mutex, std::defer_lock_t, int64_t* wait_time_ns = 
nullptr) noexcept
+            : _mutex(&mutex), _owns_lock(false), _wait_time_ns(wait_time_ns) {}
+
+    // Constructor with adopt_lock - assumes the calling thread already owns 
the lock
+    TimedLock(mutex_type& mutex, std::adopt_lock_t, int64_t* wait_time_ns = 
nullptr) noexcept
+            : _mutex(&mutex), _owns_lock(true), _wait_time_ns(wait_time_ns) {}
+
+    // Destructor - releases the lock if owned
+    ~TimedLock() {
+        if (_owns_lock) {
+            unlock();
+        }
+    }
+
+    // Delete copy constructor and assignment operator
+    TimedLock(const TimedLock&) = delete;
+    TimedLock& operator=(const TimedLock&) = delete;
+
+    // Move constructor
+    TimedLock(TimedLock&& other) noexcept
+            : _mutex(other._mutex),
+              _owns_lock(other._owns_lock),
+              _wait_time_ns(other._wait_time_ns) {
+        other._mutex = nullptr;
+        other._owns_lock = false;
+        other._wait_time_ns = nullptr;
+    }
+
+    // Move assignment operator
+    TimedLock& operator=(TimedLock&& other) noexcept {
+        if (this != &other) {
+            if (_owns_lock) {
+                unlock();
+            }
+            _mutex = other._mutex;
+            _owns_lock = other._owns_lock;
+            _wait_time_ns = other._wait_time_ns;
+            other._mutex = nullptr;
+            other._owns_lock = false;
+            other._wait_time_ns = nullptr;
+        }
+        return *this;
+    }
+
+    // Acquires the lock and records wait time if pointer is provided
+    void lock() {
+        if (!_mutex) {
+            throw 
std::system_error(std::make_error_code(std::errc::operation_not_permitted),
+                                    "TimedLock: mutex is null");
+        }
+        if (_owns_lock) {
+            throw 
std::system_error(std::make_error_code(std::errc::resource_deadlock_would_occur),
+                                    "TimedLock: already owns lock");
+        }
+
+        auto start = std::chrono::steady_clock::now();
+        _mutex->lock();
+        auto end = std::chrono::steady_clock::now();
+
+        _owns_lock = true;
+
+        if (_wait_time_ns) {
+            auto duration = 
std::chrono::duration_cast<std::chrono::nanoseconds>(end - start);
+            *_wait_time_ns = duration.count();
+        }
+    }
+
+    // Releases the lock
+    void unlock() {
+        if (!_owns_lock) {
+            throw 
std::system_error(std::make_error_code(std::errc::operation_not_permitted),
+                                    "TimedLock: does not own lock");
+        }
+        if (_mutex) {
+            _mutex->unlock();
+            _owns_lock = false;
+        }
+    }
+
+    // Checks whether this object owns the lock
+    bool owns_lock() const noexcept { return _owns_lock; }
+
+    // Checks whether this object owns the lock (for use in boolean contexts)
+    explicit operator bool() const noexcept { return owns_lock(); }
+
+    // Returns a pointer to the associated mutex
+    mutex_type* mutex() const noexcept { return _mutex; }
+
+    // Disassociates the mutex without unlocking it
+    mutex_type* release() noexcept {
+        mutex_type* ret = _mutex;
+        _mutex = nullptr;
+        _owns_lock = false;
+        return ret;
+    }
+
+    // Transfers ownership to a std::unique_lock
+    // This is useful when you need to pass the lock to a function that 
expects std::unique_lock
+    // After calling this method, this TimedLock object no longer owns the lock
+    std::unique_lock<mutex_type> transfer_to_unique_lock() noexcept {
+        if (!_owns_lock || !_mutex) {
+            return std::unique_lock<mutex_type>();
+        }
+        mutex_type* m = release();
+        return std::unique_lock<mutex_type>(*m, std::adopt_lock);
+    }
+
+private:
+    mutex_type* _mutex;
+    bool _owns_lock;
+    int64_t* _wait_time_ns;
+};
+
+// Specialization for shared_mutex with shared (read) lock
+template <typename Mutex>
+class TimedSharedLock {
+public:
+    using mutex_type = Mutex;
+
+    // Constructor that acquires the shared lock and optionally records wait 
time
+    // @param mutex: The mutex to lock
+    // @param wait_time_ns: Optional pointer to store the wait time in 
nanoseconds
+    explicit TimedSharedLock(mutex_type& mutex, int64_t* wait_time_ns = 
nullptr)
+            : _mutex(&mutex), _owns_lock(false), _wait_time_ns(wait_time_ns) {
+        lock();
+    }
+
+    // Constructor with defer_lock - does not acquire the lock
+    TimedSharedLock(mutex_type& mutex, std::defer_lock_t,
+                    int64_t* wait_time_ns = nullptr) noexcept
+            : _mutex(&mutex), _owns_lock(false), _wait_time_ns(wait_time_ns) {}
+
+    // Constructor with adopt_lock - assumes the calling thread already owns 
the lock
+    TimedSharedLock(mutex_type& mutex, std::adopt_lock_t,
+                    int64_t* wait_time_ns = nullptr) noexcept
+            : _mutex(&mutex), _owns_lock(true), _wait_time_ns(wait_time_ns) {}
+
+    // Destructor - releases the lock if owned
+    ~TimedSharedLock() {
+        if (_owns_lock) {
+            unlock();
+        }
+    }
+
+    // Delete copy constructor and assignment operator
+    TimedSharedLock(const TimedSharedLock&) = delete;
+    TimedSharedLock& operator=(const TimedSharedLock&) = delete;
+
+    // Move constructor
+    TimedSharedLock(TimedSharedLock&& other) noexcept
+            : _mutex(other._mutex),
+              _owns_lock(other._owns_lock),
+              _wait_time_ns(other._wait_time_ns) {
+        other._mutex = nullptr;
+        other._owns_lock = false;
+        other._wait_time_ns = nullptr;
+    }
+
+    // Move assignment operator
+    TimedSharedLock& operator=(TimedSharedLock&& other) noexcept {
+        if (this != &other) {
+            if (_owns_lock) {
+                unlock();
+            }
+            _mutex = other._mutex;
+            _owns_lock = other._owns_lock;
+            _wait_time_ns = other._wait_time_ns;
+            other._mutex = nullptr;
+            other._owns_lock = false;
+            other._wait_time_ns = nullptr;
+        }
+        return *this;
+    }
+
+    // Acquires the shared lock and records wait time if pointer is provided
+    void lock() {
+        if (!_mutex) {
+            throw 
std::system_error(std::make_error_code(std::errc::operation_not_permitted),
+                                    "TimedSharedLock: mutex is null");
+        }
+        if (_owns_lock) {
+            throw 
std::system_error(std::make_error_code(std::errc::resource_deadlock_would_occur),
+                                    "TimedSharedLock: already owns lock");
+        }
+
+        auto start = std::chrono::steady_clock::now();
+        _mutex->lock_shared();
+        auto end = std::chrono::steady_clock::now();
+
+        _owns_lock = true;
+
+        if (_wait_time_ns) {
+            auto duration = 
std::chrono::duration_cast<std::chrono::nanoseconds>(end - start);
+            *_wait_time_ns = duration.count();
+        }
+    }
+
+    // Releases the shared lock
+    void unlock() {
+        if (!_owns_lock) {
+            throw 
std::system_error(std::make_error_code(std::errc::operation_not_permitted),
+                                    "TimedSharedLock: does not own lock");
+        }
+        if (_mutex) {
+            _mutex->unlock_shared();
+            _owns_lock = false;
+        }
+    }
+
+    // Checks whether this object owns the lock
+    bool owns_lock() const noexcept { return _owns_lock; }
+
+    // Checks whether this object owns the lock (for use in boolean contexts)
+    explicit operator bool() const noexcept { return owns_lock(); }
+
+    // Returns a pointer to the associated mutex
+    mutex_type* mutex() const noexcept { return _mutex; }
+
+    // Disassociates the mutex without unlocking it
+    mutex_type* release() noexcept {
+        mutex_type* ret = _mutex;
+        _mutex = nullptr;
+        _owns_lock = false;
+        return ret;
+    }
+
+    // Transfers ownership to a std::shared_lock
+    // This is useful when you need to pass the lock to a function that 
expects std::shared_lock
+    // After calling this method, this TimedSharedLock object no longer owns 
the lock
+    std::shared_lock<mutex_type> transfer_to_shared_lock() noexcept {
+        if (!_owns_lock || !_mutex) {
+            return std::shared_lock<mutex_type>();
+        }
+        mutex_type* m = release();
+        return std::shared_lock<mutex_type>(*m, std::adopt_lock);
+    }
+
+private:
+    mutex_type* _mutex;
+    bool _owns_lock;
+    int64_t* _wait_time_ns;
+};
+
+} // namespace doris
+
+
diff --git a/be/src/vec/exec/scan/scanner.h b/be/src/vec/exec/scan/scanner.h
index b4b0cd7dbe6..62651438295 100644
--- a/be/src/vec/exec/scan/scanner.h
+++ b/be/src/vec/exec/scan/scanner.h
@@ -149,6 +149,12 @@ public:
 
     int64_t get_scanner_wait_worker_timer() const { return 
_scanner_wait_worker_timer; }
 
+    void update_sched_lock_timer(int64_t delta_ns) { _scanner_sched_lock_timer 
+= delta_ns; }
+    void update_submit_count() { ++_scanner_submit_count; }
+
+    int64_t get_scanner_sched_lock_timer() const { return 
_scanner_sched_lock_timer; }
+    int64_t get_scanner_submit_count() const { return _scanner_submit_count; }
+
     void update_scan_cpu_timer();
 
     // Some counters need to be updated realtime, for example, workload group 
policy need
@@ -262,6 +268,9 @@ protected:
     int64_t _projection_timer = 0;
 
     bool _should_stop = false;
+
+    int64_t _scanner_sched_lock_timer = 0;
+    int64_t _scanner_submit_count = 0;
 };
 
 using ScannerSPtr = std::shared_ptr<Scanner>;
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index c6c46223894..d364c83318c 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -388,10 +388,14 @@ void ScannerContext::stop_scanners(RuntimeState* state) {
         std::stringstream scanner_rows_read;
         std::stringstream scanner_wait_worker_time;
         std::stringstream scanner_projection;
+        std::stringstream scanner_sched_lock_time;
+        std::stringstream scanner_summit_count;
         scanner_statistics << "[";
         scanner_rows_read << "[";
         scanner_wait_worker_time << "[";
         scanner_projection << "[";
+        scanner_sched_lock_time << "[";
+        scanner_summit_count << "[";
         // Scanners can in 3 state
         //  state 1: in scanner context, not scheduled
         //  state 2: in scanner worker pool's queue, scheduled but not running
@@ -415,6 +419,9 @@ void ScannerContext::stop_scanners(RuntimeState* state) {
                     << 
PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(),
                                             TUnit::TIME_NS)
                     << ", ";
+
+            scanner_sched_lock_time << 
PrettyPrinter::print(scanner->_scanner->get_scanner_sched_lock_timer(), 
TUnit::TIME_NS) << ", ";
+            scanner_summit_count << 
PrettyPrinter::print(scanner->_scanner->get_scanner_submit_count(), 
TUnit::UNIT) << ", ";
             // since there are all scanners, some scanners is running, so that 
could not call scanner
             // close here.
         }
@@ -422,10 +429,14 @@ void ScannerContext::stop_scanners(RuntimeState* state) {
         scanner_rows_read << "]";
         scanner_wait_worker_time << "]";
         scanner_projection << "]";
+        scanner_sched_lock_time << "]";
+        scanner_summit_count << "]";
         _scanner_profile->add_info_string("PerScannerRunningTime", 
scanner_statistics.str());
         _scanner_profile->add_info_string("PerScannerRowsRead", 
scanner_rows_read.str());
         _scanner_profile->add_info_string("PerScannerWaitTime", 
scanner_wait_worker_time.str());
         _scanner_profile->add_info_string("PerScannerProjectionTime", 
scanner_projection.str());
+        _scanner_profile->add_info_string("PerScannerWaitSchedLockTime", 
scanner_sched_lock_time.str());
+        _scanner_profile->add_info_string("PerScannerSubmitCount", 
scanner_summit_count.str());
     }
 }
 
@@ -576,6 +587,9 @@ Status 
ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan
             _set_scanner_done();
             return _process_status;
         }
+        if (auto s = scan_task_iter->scanner.lock()) {
+            s->_scanner->update_submit_count();
+        }
     }
 
     return Status::OK();
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 26ed7f52811..33cf6cf0f83 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -156,7 +156,7 @@ public:
 #ifndef BE_TEST
         stop();
 #endif
-        LOG(INFO) << "Scanner sche " << _sched_name << " shutdown";
+        LOG(INFO) << "Scanner sched " << _sched_name << " shutdown";
     }
 
     void stop() override {
diff --git a/be/src/vec/exec/scan/simplified_scan_scheduler.cpp 
b/be/src/vec/exec/scan/simplified_scan_scheduler.cpp
index befebda5732..9af09707c4f 100644
--- a/be/src/vec/exec/scan/simplified_scan_scheduler.cpp
+++ b/be/src/vec/exec/scan/simplified_scan_scheduler.cpp
@@ -18,7 +18,9 @@
 #include <memory>
 
 #include "scanner_scheduler.h"
+#include "util/timed_lock.h"
 #include "vec/exec/scan/scanner_context.h"
+#include "vec/exec/scan/scan_node.h"
 
 namespace doris::vectorized {
 class ScannerDelegate;
@@ -27,14 +29,32 @@ class ScanTask;
 Status TaskExecutorSimplifiedScanScheduler::schedule_scan_task(
         std::shared_ptr<ScannerContext> scanner_ctx, std::shared_ptr<ScanTask> 
current_scan_task,
         std::unique_lock<std::mutex>& transfer_lock) {
-    std::unique_lock<std::shared_mutex> wl(_lock);
+    // std::unique_lock<std::shared_mutex> wl(_lock);
+    int64_t lock_wait_time_ns = 0;
+    TimedLock<std::shared_mutex> timed_lock(_lock, &lock_wait_time_ns);
+    // LOG(INFO) << "yy debug lock_wait_time_ns: " << lock_wait_time_ns;
+    if (current_scan_task) {
+        if (auto s = current_scan_task->scanner.lock()) {
+            s->_scanner->update_sched_lock_timer(lock_wait_time_ns);
+        }
+    }
+    std::unique_lock<std::shared_mutex> wl = 
timed_lock.transfer_to_unique_lock();
     return scanner_ctx->schedule_scan_task(current_scan_task, transfer_lock, 
wl);
 }
 
 Status ThreadPoolSimplifiedScanScheduler::schedule_scan_task(
         std::shared_ptr<ScannerContext> scanner_ctx, std::shared_ptr<ScanTask> 
current_scan_task,
         std::unique_lock<std::mutex>& transfer_lock) {
-    std::unique_lock<std::shared_mutex> wl(_lock);
+    // std::unique_lock<std::shared_mutex> wl(_lock);
+    int64_t lock_wait_time_ns = 0;
+    TimedLock<std::shared_mutex> timed_lock(_lock, &lock_wait_time_ns);
+    // LOG(INFO) << "yy debug 2 lock_wait_time_ns: " << lock_wait_time_ns;
+    if (current_scan_task) {
+        if (auto s = current_scan_task->scanner.lock()) {
+            s->_scanner->update_sched_lock_timer(lock_wait_time_ns);
+        }
+    }
+    std::unique_lock<std::shared_mutex> wl = 
timed_lock.transfer_to_unique_lock();
     return scanner_ctx->schedule_scan_task(current_scan_task, transfer_lock, 
wl);
 }
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to