This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch tpc_preview4-external in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3122b6f572dfb70894ea29158f0ef050173f0f27 Author: morningman <[email protected]> AuthorDate: Wed Dec 31 17:01:38 2025 +0800 add timed lock and PerScannerWaitSchedLockTime --- be/src/util/timed_lock.h | 297 +++++++++++++++++++++ be/src/vec/exec/scan/scanner.h | 9 + be/src/vec/exec/scan/scanner_context.cpp | 14 + be/src/vec/exec/scan/scanner_scheduler.h | 2 +- be/src/vec/exec/scan/simplified_scan_scheduler.cpp | 24 +- 5 files changed, 343 insertions(+), 3 deletions(-) diff --git a/be/src/util/timed_lock.h b/be/src/util/timed_lock.h new file mode 100644 index 00000000000..9b2b72ccb87 --- /dev/null +++ b/be/src/util/timed_lock.h @@ -0,0 +1,297 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <chrono> +#include <cstdint> +#include <mutex> +#include <shared_mutex> + +namespace doris { + +// A RAII-style lock wrapper that can optionally record the time spent waiting for the lock. +// Similar to std::unique_lock but with timing capabilities. +// +// Usage example: +// std::shared_mutex mutex; +// int64_t wait_time_ns = 0; +// { +// TimedLock<std::shared_mutex> lock(mutex, &wait_time_ns); +// // Critical section +// } +// // wait_time_ns now contains the time spent waiting for the lock in nanoseconds +template <typename Mutex> +class TimedLock { +public: + using mutex_type = Mutex; + + // Constructor that acquires the lock and optionally records wait time + // @param mutex: The mutex to lock + // @param wait_time_ns: Optional pointer to store the wait time in nanoseconds + explicit TimedLock(mutex_type& mutex, int64_t* wait_time_ns = nullptr) + : _mutex(&mutex), _owns_lock(false), _wait_time_ns(wait_time_ns) { + lock(); + } + + // Constructor with defer_lock - does not acquire the lock + TimedLock(mutex_type& mutex, std::defer_lock_t, int64_t* wait_time_ns = nullptr) noexcept + : _mutex(&mutex), _owns_lock(false), _wait_time_ns(wait_time_ns) {} + + // Constructor with adopt_lock - assumes the calling thread already owns the lock + TimedLock(mutex_type& mutex, std::adopt_lock_t, int64_t* wait_time_ns = nullptr) noexcept + : _mutex(&mutex), _owns_lock(true), _wait_time_ns(wait_time_ns) {} + + // Destructor - releases the lock if owned + ~TimedLock() { + if (_owns_lock) { + unlock(); + } + } + + // Delete copy constructor and assignment operator + TimedLock(const TimedLock&) = delete; + TimedLock& operator=(const TimedLock&) = delete; + + // Move constructor + TimedLock(TimedLock&& other) noexcept + : _mutex(other._mutex), + _owns_lock(other._owns_lock), + _wait_time_ns(other._wait_time_ns) { + other._mutex = nullptr; + other._owns_lock = false; + other._wait_time_ns = nullptr; + } + + // Move assignment operator + TimedLock& operator=(TimedLock&& other) noexcept { + if (this != &other) { + if (_owns_lock) { + unlock(); + } + _mutex = other._mutex; + _owns_lock = other._owns_lock; + _wait_time_ns = other._wait_time_ns; + other._mutex = nullptr; + other._owns_lock = false; + other._wait_time_ns = nullptr; + } + return *this; + } + + // Acquires the lock and records wait time if pointer is provided + void lock() { + if (!_mutex) { + throw std::system_error(std::make_error_code(std::errc::operation_not_permitted), + "TimedLock: mutex is null"); + } + if (_owns_lock) { + throw std::system_error(std::make_error_code(std::errc::resource_deadlock_would_occur), + "TimedLock: already owns lock"); + } + + auto start = std::chrono::steady_clock::now(); + _mutex->lock(); + auto end = std::chrono::steady_clock::now(); + + _owns_lock = true; + + if (_wait_time_ns) { + auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start); + *_wait_time_ns = duration.count(); + } + } + + // Releases the lock + void unlock() { + if (!_owns_lock) { + throw std::system_error(std::make_error_code(std::errc::operation_not_permitted), + "TimedLock: does not own lock"); + } + if (_mutex) { + _mutex->unlock(); + _owns_lock = false; + } + } + + // Checks whether this object owns the lock + bool owns_lock() const noexcept { return _owns_lock; } + + // Checks whether this object owns the lock (for use in boolean contexts) + explicit operator bool() const noexcept { return owns_lock(); } + + // Returns a pointer to the associated mutex + mutex_type* mutex() const noexcept { return _mutex; } + + // Disassociates the mutex without unlocking it + mutex_type* release() noexcept { + mutex_type* ret = _mutex; + _mutex = nullptr; + _owns_lock = false; + return ret; + } + + // Transfers ownership to a std::unique_lock + // This is useful when you need to pass the lock to a function that expects std::unique_lock + // After calling this method, this TimedLock object no longer owns the lock + std::unique_lock<mutex_type> transfer_to_unique_lock() noexcept { + if (!_owns_lock || !_mutex) { + return std::unique_lock<mutex_type>(); + } + mutex_type* m = release(); + return std::unique_lock<mutex_type>(*m, std::adopt_lock); + } + +private: + mutex_type* _mutex; + bool _owns_lock; + int64_t* _wait_time_ns; +}; + +// Specialization for shared_mutex with shared (read) lock +template <typename Mutex> +class TimedSharedLock { +public: + using mutex_type = Mutex; + + // Constructor that acquires the shared lock and optionally records wait time + // @param mutex: The mutex to lock + // @param wait_time_ns: Optional pointer to store the wait time in nanoseconds + explicit TimedSharedLock(mutex_type& mutex, int64_t* wait_time_ns = nullptr) + : _mutex(&mutex), _owns_lock(false), _wait_time_ns(wait_time_ns) { + lock(); + } + + // Constructor with defer_lock - does not acquire the lock + TimedSharedLock(mutex_type& mutex, std::defer_lock_t, + int64_t* wait_time_ns = nullptr) noexcept + : _mutex(&mutex), _owns_lock(false), _wait_time_ns(wait_time_ns) {} + + // Constructor with adopt_lock - assumes the calling thread already owns the lock + TimedSharedLock(mutex_type& mutex, std::adopt_lock_t, + int64_t* wait_time_ns = nullptr) noexcept + : _mutex(&mutex), _owns_lock(true), _wait_time_ns(wait_time_ns) {} + + // Destructor - releases the lock if owned + ~TimedSharedLock() { + if (_owns_lock) { + unlock(); + } + } + + // Delete copy constructor and assignment operator + TimedSharedLock(const TimedSharedLock&) = delete; + TimedSharedLock& operator=(const TimedSharedLock&) = delete; + + // Move constructor + TimedSharedLock(TimedSharedLock&& other) noexcept + : _mutex(other._mutex), + _owns_lock(other._owns_lock), + _wait_time_ns(other._wait_time_ns) { + other._mutex = nullptr; + other._owns_lock = false; + other._wait_time_ns = nullptr; + } + + // Move assignment operator + TimedSharedLock& operator=(TimedSharedLock&& other) noexcept { + if (this != &other) { + if (_owns_lock) { + unlock(); + } + _mutex = other._mutex; + _owns_lock = other._owns_lock; + _wait_time_ns = other._wait_time_ns; + other._mutex = nullptr; + other._owns_lock = false; + other._wait_time_ns = nullptr; + } + return *this; + } + + // Acquires the shared lock and records wait time if pointer is provided + void lock() { + if (!_mutex) { + throw std::system_error(std::make_error_code(std::errc::operation_not_permitted), + "TimedSharedLock: mutex is null"); + } + if (_owns_lock) { + throw std::system_error(std::make_error_code(std::errc::resource_deadlock_would_occur), + "TimedSharedLock: already owns lock"); + } + + auto start = std::chrono::steady_clock::now(); + _mutex->lock_shared(); + auto end = std::chrono::steady_clock::now(); + + _owns_lock = true; + + if (_wait_time_ns) { + auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start); + *_wait_time_ns = duration.count(); + } + } + + // Releases the shared lock + void unlock() { + if (!_owns_lock) { + throw std::system_error(std::make_error_code(std::errc::operation_not_permitted), + "TimedSharedLock: does not own lock"); + } + if (_mutex) { + _mutex->unlock_shared(); + _owns_lock = false; + } + } + + // Checks whether this object owns the lock + bool owns_lock() const noexcept { return _owns_lock; } + + // Checks whether this object owns the lock (for use in boolean contexts) + explicit operator bool() const noexcept { return owns_lock(); } + + // Returns a pointer to the associated mutex + mutex_type* mutex() const noexcept { return _mutex; } + + // Disassociates the mutex without unlocking it + mutex_type* release() noexcept { + mutex_type* ret = _mutex; + _mutex = nullptr; + _owns_lock = false; + return ret; + } + + // Transfers ownership to a std::shared_lock + // This is useful when you need to pass the lock to a function that expects std::shared_lock + // After calling this method, this TimedSharedLock object no longer owns the lock + std::shared_lock<mutex_type> transfer_to_shared_lock() noexcept { + if (!_owns_lock || !_mutex) { + return std::shared_lock<mutex_type>(); + } + mutex_type* m = release(); + return std::shared_lock<mutex_type>(*m, std::adopt_lock); + } + +private: + mutex_type* _mutex; + bool _owns_lock; + int64_t* _wait_time_ns; +}; + +} // namespace doris + + diff --git a/be/src/vec/exec/scan/scanner.h b/be/src/vec/exec/scan/scanner.h index b4b0cd7dbe6..62651438295 100644 --- a/be/src/vec/exec/scan/scanner.h +++ b/be/src/vec/exec/scan/scanner.h @@ -149,6 +149,12 @@ public: int64_t get_scanner_wait_worker_timer() const { return _scanner_wait_worker_timer; } + void update_sched_lock_timer(int64_t delta_ns) { _scanner_sched_lock_timer += delta_ns; } + void update_submit_count() { ++_scanner_submit_count; } + + int64_t get_scanner_sched_lock_timer() const { return _scanner_sched_lock_timer; } + int64_t get_scanner_submit_count() const { return _scanner_submit_count; } + void update_scan_cpu_timer(); // Some counters need to be updated realtime, for example, workload group policy need @@ -262,6 +268,9 @@ protected: int64_t _projection_timer = 0; bool _should_stop = false; + + int64_t _scanner_sched_lock_timer = 0; + int64_t _scanner_submit_count = 0; }; using ScannerSPtr = std::shared_ptr<Scanner>; diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index c6c46223894..d364c83318c 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -388,10 +388,14 @@ void ScannerContext::stop_scanners(RuntimeState* state) { std::stringstream scanner_rows_read; std::stringstream scanner_wait_worker_time; std::stringstream scanner_projection; + std::stringstream scanner_sched_lock_time; + std::stringstream scanner_summit_count; scanner_statistics << "["; scanner_rows_read << "["; scanner_wait_worker_time << "["; scanner_projection << "["; + scanner_sched_lock_time << "["; + scanner_summit_count << "["; // Scanners can in 3 state // state 1: in scanner context, not scheduled // state 2: in scanner worker pool's queue, scheduled but not running @@ -415,6 +419,9 @@ void ScannerContext::stop_scanners(RuntimeState* state) { << PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(), TUnit::TIME_NS) << ", "; + + scanner_sched_lock_time << PrettyPrinter::print(scanner->_scanner->get_scanner_sched_lock_timer(), TUnit::TIME_NS) << ", "; + scanner_summit_count << PrettyPrinter::print(scanner->_scanner->get_scanner_submit_count(), TUnit::UNIT) << ", "; // since there are all scanners, some scanners is running, so that could not call scanner // close here. } @@ -422,10 +429,14 @@ void ScannerContext::stop_scanners(RuntimeState* state) { scanner_rows_read << "]"; scanner_wait_worker_time << "]"; scanner_projection << "]"; + scanner_sched_lock_time << "]"; + scanner_summit_count << "]"; _scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str()); _scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str()); _scanner_profile->add_info_string("PerScannerWaitTime", scanner_wait_worker_time.str()); _scanner_profile->add_info_string("PerScannerProjectionTime", scanner_projection.str()); + _scanner_profile->add_info_string("PerScannerWaitSchedLockTime", scanner_sched_lock_time.str()); + _scanner_profile->add_info_string("PerScannerSubmitCount", scanner_summit_count.str()); } } @@ -576,6 +587,9 @@ Status ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan _set_scanner_done(); return _process_status; } + if (auto s = scan_task_iter->scanner.lock()) { + s->_scanner->update_submit_count(); + } } return Status::OK(); diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 26ed7f52811..33cf6cf0f83 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -156,7 +156,7 @@ public: #ifndef BE_TEST stop(); #endif - LOG(INFO) << "Scanner sche " << _sched_name << " shutdown"; + LOG(INFO) << "Scanner sched " << _sched_name << " shutdown"; } void stop() override { diff --git a/be/src/vec/exec/scan/simplified_scan_scheduler.cpp b/be/src/vec/exec/scan/simplified_scan_scheduler.cpp index befebda5732..9af09707c4f 100644 --- a/be/src/vec/exec/scan/simplified_scan_scheduler.cpp +++ b/be/src/vec/exec/scan/simplified_scan_scheduler.cpp @@ -18,7 +18,9 @@ #include <memory> #include "scanner_scheduler.h" +#include "util/timed_lock.h" #include "vec/exec/scan/scanner_context.h" +#include "vec/exec/scan/scan_node.h" namespace doris::vectorized { class ScannerDelegate; @@ -27,14 +29,32 @@ class ScanTask; Status TaskExecutorSimplifiedScanScheduler::schedule_scan_task( std::shared_ptr<ScannerContext> scanner_ctx, std::shared_ptr<ScanTask> current_scan_task, std::unique_lock<std::mutex>& transfer_lock) { - std::unique_lock<std::shared_mutex> wl(_lock); + // std::unique_lock<std::shared_mutex> wl(_lock); + int64_t lock_wait_time_ns = 0; + TimedLock<std::shared_mutex> timed_lock(_lock, &lock_wait_time_ns); + // LOG(INFO) << "yy debug lock_wait_time_ns: " << lock_wait_time_ns; + if (current_scan_task) { + if (auto s = current_scan_task->scanner.lock()) { + s->_scanner->update_sched_lock_timer(lock_wait_time_ns); + } + } + std::unique_lock<std::shared_mutex> wl = timed_lock.transfer_to_unique_lock(); return scanner_ctx->schedule_scan_task(current_scan_task, transfer_lock, wl); } Status ThreadPoolSimplifiedScanScheduler::schedule_scan_task( std::shared_ptr<ScannerContext> scanner_ctx, std::shared_ptr<ScanTask> current_scan_task, std::unique_lock<std::mutex>& transfer_lock) { - std::unique_lock<std::shared_mutex> wl(_lock); + // std::unique_lock<std::shared_mutex> wl(_lock); + int64_t lock_wait_time_ns = 0; + TimedLock<std::shared_mutex> timed_lock(_lock, &lock_wait_time_ns); + // LOG(INFO) << "yy debug 2 lock_wait_time_ns: " << lock_wait_time_ns; + if (current_scan_task) { + if (auto s = current_scan_task->scanner.lock()) { + s->_scanner->update_sched_lock_timer(lock_wait_time_ns); + } + } + std::unique_lock<std::shared_mutex> wl = timed_lock.transfer_to_unique_lock(); return scanner_ctx->schedule_scan_task(current_scan_task, transfer_lock, wl); } } // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
