This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0e95a6a7747 [exec](pipeline) runtime filter wait time (#34872) 0e95a6a7747 is described below commit 0e95a6a77477d72488f3cb43af4f50e2541dcf04 Author: HappenLee <happen...@hotmail.com> AuthorDate: Mon May 20 23:43:59 2024 +0800 [exec](pipeline) runtime filter wait time (#34872) --- be/src/exprs/runtime_filter.h | 2 ++ be/src/pipeline/dependency.cpp | 39 +++++++++++++++++++++++------ be/src/pipeline/dependency.h | 17 +++++++++---- be/src/vec/exec/runtime_filter_consumer.cpp | 25 +++++++++++++++--- 4 files changed, 68 insertions(+), 15 deletions(-) diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 0deb7c3ddd7..3cedde8e8a5 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -244,6 +244,8 @@ public: bool has_remote_target() const { return _has_remote_target; } + bool has_local_target() const { return _has_local_target; } + bool is_ready() const { return (!_enable_pipeline_exec && _rf_state == RuntimeFilterState::READY) || (_enable_pipeline_exec && diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index d37e3dc0401..e736174764d 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -119,6 +119,27 @@ void RuntimeFilterTimer::call_ready() { _parent->set_ready(); } +// should check rf timeout in two case: +// 1. the rf is ready just remove the wait queue +// 2. if the rf have local dependency, the rf should start wait when all local dependency is ready +bool RuntimeFilterTimer::should_be_check_timeout() { + if (!_parent->ready() && !_local_runtime_filter_dependencies.empty()) { + bool all_ready = true; + for (auto& dep : _local_runtime_filter_dependencies) { + if (!dep->ready()) { + all_ready = false; + break; + } + } + if (all_ready) { + _local_runtime_filter_dependencies.clear(); + _registration_time = MonotonicMillis(); + } + return all_ready; + } + return true; +} + void RuntimeFilterTimerQueue::start() { while (!_stop) { std::unique_lock<std::mutex> lk(cv_m); @@ -135,14 +156,18 @@ void RuntimeFilterTimerQueue::start() { for (auto& it : _que) { if (it.use_count() == 1) { // `use_count == 1` means this runtime filter has been released - } else if (it->_parent->is_blocked_by(nullptr)) { - // This means runtime filter is not ready, so we call timeout or continue to poll this timer. - int64_t ms_since_registration = MonotonicMillis() - it->registration_time(); - if (ms_since_registration > it->wait_time_ms()) { - it->call_timeout(); - } else { - new_que.push_back(std::move(it)); + } else if (it->should_be_check_timeout()) { + if (it->_parent->is_blocked_by(nullptr)) { + // This means runtime filter is not ready, so we call timeout or continue to poll this timer. + int64_t ms_since_registration = MonotonicMillis() - it->registration_time(); + if (ms_since_registration > it->wait_time_ms()) { + it->call_timeout(); + } else { + new_que.push_back(std::move(it)); + } } + } else { + new_que.push_back(std::move(it)); } } new_que.swap(_que); diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index de2e81bcda3..580cb8368c8 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -107,6 +107,7 @@ public: BasicSharedState* shared_state() { return _shared_state; } void set_shared_state(BasicSharedState* shared_state) { _shared_state = shared_state; } virtual std::string debug_string(int indentation_level = 0); + bool ready() const { return _ready; } // Start the watcher. We use it to count how long this dependency block the current pipeline task. void start_watcher() { _watcher.start(); } @@ -231,11 +232,19 @@ public: int64_t registration_time() const { return _registration_time; } int32_t wait_time_ms() const { return _wait_time_ms; } + void set_local_runtime_filter_dependencies( + const std::vector<std::shared_ptr<RuntimeFilterDependency>>& deps) { + _local_runtime_filter_dependencies = deps; + } + + bool should_be_check_timeout(); + private: friend struct RuntimeFilterTimerQueue; std::shared_ptr<RuntimeFilterDependency> _parent = nullptr; + std::vector<std::shared_ptr<RuntimeFilterDependency>> _local_runtime_filter_dependencies; std::mutex _lock; - const int64_t _registration_time; + int64_t _registration_time; const int32_t _wait_time_ms; }; @@ -258,11 +267,9 @@ struct RuntimeFilterTimerQueue { ~RuntimeFilterTimerQueue() = default; RuntimeFilterTimerQueue() { _thread = std::thread(&RuntimeFilterTimerQueue::start, this); } - void push_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer> filter) { push(filter); } - - void push(std::shared_ptr<pipeline::RuntimeFilterTimer> filter) { + void push_filter_timer(std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>>&& filter) { std::unique_lock<std::mutex> lc(_que_lock); - _que.push_back(filter); + _que.insert(_que.end(), filter.begin(), filter.end()); cv.notify_all(); } diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index 0c6766b8bca..f80be824d58 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -67,17 +67,36 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency( runtime_filter_dependencies, const int id, const int node_id, const std::string& name) { runtime_filter_dependencies.resize(_runtime_filter_descs.size()); + std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> runtime_filter_timers( + _runtime_filter_descs.size()); + std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>> + local_runtime_filter_dependencies; + for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>( id, node_id, name, runtime_filter); _runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get(); - auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>( + runtime_filter_timers[i] = std::make_shared<pipeline::RuntimeFilterTimer>( runtime_filter->registration_time(), runtime_filter->wait_time_ms(), runtime_filter_dependencies[i]); - runtime_filter->set_filter_timer(filter_timer); - ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer); + runtime_filter->set_filter_timer(runtime_filter_timers[i]); + if (runtime_filter->has_local_target()) { + local_runtime_filter_dependencies.emplace_back(runtime_filter_dependencies[i]); + } + } + + // The gloabl runtime filter timer need set local runtime filter dependencies. + // start to wait before the local runtime filter ready + for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { + IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; + if (!runtime_filter->has_local_target()) { + runtime_filter_timers[i]->set_local_runtime_filter_dependencies( + local_runtime_filter_dependencies); + } } + ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer( + std::move(runtime_filter_timers)); } Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org