This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 8ca399ab920 [exec](pipeline) runtime filter wait time (#35108) 8ca399ab920 is described below commit 8ca399ab920e5719baacf9740f1cd2fad8d5dd6c Author: HappenLee <happen...@hotmail.com> AuthorDate: Tue May 21 12:50:05 2024 +0800 [exec](pipeline) runtime filter wait time (#35108) --- be/src/exprs/runtime_filter.h | 2 ++ be/src/pipeline/pipeline_x/dependency.cpp | 21 +++++++++++++++++++++ be/src/pipeline/pipeline_x/dependency.h | 17 ++++++++++++----- be/src/vec/exec/runtime_filter_consumer.cpp | 25 ++++++++++++++++++++++--- 4 files changed, 57 insertions(+), 8 deletions(-) diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 781f7ac34ff..4733d39e298 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -244,6 +244,8 @@ public: bool has_remote_target() const { return _has_remote_target; } + bool has_local_target() const { return _has_local_target; } + bool is_ready() const { return (!_enable_pipeline_exec && _rf_state == RuntimeFilterState::READY) || (_enable_pipeline_exec && diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index ba47f935598..093e26ff854 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -125,6 +125,27 @@ Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) { return ready ? nullptr : this; } +// should check rf timeout in two case: +// 1. the rf is ready just remove the wait queue +// 2. if the rf have local dependency, the rf should start wait when all local dependency is ready +bool RuntimeFilterTimer::should_be_check_timeout() { + if (!_parent->ready() && !_local_runtime_filter_dependencies.empty()) { + bool all_ready = true; + for (auto& dep : _local_runtime_filter_dependencies) { + if (!dep->ready()) { + all_ready = false; + break; + } + } + if (all_ready) { + _local_runtime_filter_dependencies.clear(); + _registration_time = MonotonicMillis(); + } + return all_ready; + } + return true; +} + void RuntimeFilterTimer::call_timeout() { _parent->set_ready(); } diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 693bde10f36..525a6dea562 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -112,6 +112,7 @@ public: BasicSharedState* shared_state() { return _shared_state; } void set_shared_state(BasicSharedState* shared_state) { _shared_state = shared_state; } virtual std::string debug_string(int indentation_level = 0); + bool ready() const { return _ready; } // Start the watcher. We use it to count how long this dependency block the current pipeline task. void start_watcher() { _watcher.start(); } @@ -256,11 +257,19 @@ public: int64_t registration_time() const { return _registration_time; } int32_t wait_time_ms() const { return _wait_time_ms; } + void set_local_runtime_filter_dependencies( + const std::vector<std::shared_ptr<RuntimeFilterDependency>>& deps) { + _local_runtime_filter_dependencies = deps; + } + + bool should_be_check_timeout(); + private: friend struct RuntimeFilterTimerQueue; std::shared_ptr<RuntimeFilterDependency> _parent = nullptr; + std::vector<std::shared_ptr<RuntimeFilterDependency>> _local_runtime_filter_dependencies; std::mutex _lock; - const int64_t _registration_time; + int64_t _registration_time; const int32_t _wait_time_ms; }; @@ -283,11 +292,9 @@ struct RuntimeFilterTimerQueue { ~RuntimeFilterTimerQueue() = default; RuntimeFilterTimerQueue() { _thread = std::thread(&RuntimeFilterTimerQueue::start, this); } - void push_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer> filter) { push(filter); } - - void push(std::shared_ptr<pipeline::RuntimeFilterTimer> filter) { + void push_filter_timer(std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>>&& filter) { std::unique_lock<std::mutex> lc(_que_lock); - _que.push_back(filter); + _que.insert(_que.end(), filter.begin(), filter.end()); cv.notify_all(); } diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index 2913fad3d8d..66fd0297c98 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -81,17 +81,36 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency( runtime_filter_dependencies, const int id, const int node_id, const std::string& name) { runtime_filter_dependencies.resize(_runtime_filter_descs.size()); + std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> runtime_filter_timers( + _runtime_filter_descs.size()); + std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>> + local_runtime_filter_dependencies; + for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>( id, node_id, name, _state->get_query_ctx(), runtime_filter); _runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get(); - auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>( + runtime_filter_timers[i] = std::make_shared<pipeline::RuntimeFilterTimer>( runtime_filter->registration_time(), runtime_filter->wait_time_ms(), runtime_filter_dependencies[i]); - runtime_filter->set_filter_timer(filter_timer); - ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer); + runtime_filter->set_filter_timer(runtime_filter_timers[i]); + if (runtime_filter->has_local_target()) { + local_runtime_filter_dependencies.emplace_back(runtime_filter_dependencies[i]); + } + } + + // The gloabl runtime filter timer need set local runtime filter dependencies. + // start to wait before the local runtime filter ready + for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { + IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; + if (!runtime_filter->has_local_target()) { + runtime_filter_timers[i]->set_local_runtime_filter_dependencies( + local_runtime_filter_dependencies); + } } + ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer( + std::move(runtime_filter_timers)); } Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org