This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 8ca399ab920 [exec](pipeline) runtime filter wait time (#35108)
8ca399ab920 is described below

commit 8ca399ab920e5719baacf9740f1cd2fad8d5dd6c
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Tue May 21 12:50:05 2024 +0800

    [exec](pipeline) runtime filter wait time (#35108)
---
 be/src/exprs/runtime_filter.h               |  2 ++
 be/src/pipeline/pipeline_x/dependency.cpp   | 21 +++++++++++++++++++++
 be/src/pipeline/pipeline_x/dependency.h     | 17 ++++++++++++-----
 be/src/vec/exec/runtime_filter_consumer.cpp | 25 ++++++++++++++++++++++---
 4 files changed, 57 insertions(+), 8 deletions(-)

diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 781f7ac34ff..4733d39e298 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -244,6 +244,8 @@ public:
 
     bool has_remote_target() const { return _has_remote_target; }
 
+    bool has_local_target() const { return _has_local_target; }
+
     bool is_ready() const {
         return (!_enable_pipeline_exec && _rf_state == 
RuntimeFilterState::READY) ||
                (_enable_pipeline_exec &&
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index ba47f935598..093e26ff854 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -125,6 +125,27 @@ Dependency* 
RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
     return ready ? nullptr : this;
 }
 
+// should check rf timeout in two case:
+// 1. the rf is ready just remove the wait queue
+// 2. if the rf have local dependency, the rf should start wait when all local 
dependency is ready
+bool RuntimeFilterTimer::should_be_check_timeout() {
+    if (!_parent->ready() && !_local_runtime_filter_dependencies.empty()) {
+        bool all_ready = true;
+        for (auto& dep : _local_runtime_filter_dependencies) {
+            if (!dep->ready()) {
+                all_ready = false;
+                break;
+            }
+        }
+        if (all_ready) {
+            _local_runtime_filter_dependencies.clear();
+            _registration_time = MonotonicMillis();
+        }
+        return all_ready;
+    }
+    return true;
+}
+
 void RuntimeFilterTimer::call_timeout() {
     _parent->set_ready();
 }
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 693bde10f36..525a6dea562 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -112,6 +112,7 @@ public:
     BasicSharedState* shared_state() { return _shared_state; }
     void set_shared_state(BasicSharedState* shared_state) { _shared_state = 
shared_state; }
     virtual std::string debug_string(int indentation_level = 0);
+    bool ready() const { return _ready; }
 
     // Start the watcher. We use it to count how long this dependency block 
the current pipeline task.
     void start_watcher() { _watcher.start(); }
@@ -256,11 +257,19 @@ public:
     int64_t registration_time() const { return _registration_time; }
     int32_t wait_time_ms() const { return _wait_time_ms; }
 
+    void set_local_runtime_filter_dependencies(
+            const std::vector<std::shared_ptr<RuntimeFilterDependency>>& deps) 
{
+        _local_runtime_filter_dependencies = deps;
+    }
+
+    bool should_be_check_timeout();
+
 private:
     friend struct RuntimeFilterTimerQueue;
     std::shared_ptr<RuntimeFilterDependency> _parent = nullptr;
+    std::vector<std::shared_ptr<RuntimeFilterDependency>> 
_local_runtime_filter_dependencies;
     std::mutex _lock;
-    const int64_t _registration_time;
+    int64_t _registration_time;
     const int32_t _wait_time_ms;
 };
 
@@ -283,11 +292,9 @@ struct RuntimeFilterTimerQueue {
 
     ~RuntimeFilterTimerQueue() = default;
     RuntimeFilterTimerQueue() { _thread = 
std::thread(&RuntimeFilterTimerQueue::start, this); }
-    void push_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer> 
filter) { push(filter); }
-
-    void push(std::shared_ptr<pipeline::RuntimeFilterTimer> filter) {
+    void 
push_filter_timer(std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>>&& 
filter) {
         std::unique_lock<std::mutex> lc(_que_lock);
-        _que.push_back(filter);
+        _que.insert(_que.end(), filter.begin(), filter.end());
         cv.notify_all();
     }
 
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp 
b/be/src/vec/exec/runtime_filter_consumer.cpp
index 2913fad3d8d..66fd0297c98 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -81,17 +81,36 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency(
                 runtime_filter_dependencies,
         const int id, const int node_id, const std::string& name) {
     runtime_filter_dependencies.resize(_runtime_filter_descs.size());
+    std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> 
runtime_filter_timers(
+            _runtime_filter_descs.size());
+    std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>
+            local_runtime_filter_dependencies;
+
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = 
_runtime_filter_ctxs[i].runtime_filter;
         runtime_filter_dependencies[i] = 
std::make_shared<pipeline::RuntimeFilterDependency>(
                 id, node_id, name, _state->get_query_ctx(), runtime_filter);
         _runtime_filter_ctxs[i].runtime_filter_dependency = 
runtime_filter_dependencies[i].get();
-        auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+        runtime_filter_timers[i] = 
std::make_shared<pipeline::RuntimeFilterTimer>(
                 runtime_filter->registration_time(), 
runtime_filter->wait_time_ms(),
                 runtime_filter_dependencies[i]);
-        runtime_filter->set_filter_timer(filter_timer);
-        
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
+        runtime_filter->set_filter_timer(runtime_filter_timers[i]);
+        if (runtime_filter->has_local_target()) {
+            
local_runtime_filter_dependencies.emplace_back(runtime_filter_dependencies[i]);
+        }
+    }
+
+    // The gloabl runtime filter timer need set local runtime filter 
dependencies.
+    // start to wait before the local runtime filter ready
+    for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+        IRuntimeFilter* runtime_filter = 
_runtime_filter_ctxs[i].runtime_filter;
+        if (!runtime_filter->has_local_target()) {
+            runtime_filter_timers[i]->set_local_runtime_filter_dependencies(
+                    local_runtime_filter_dependencies);
+        }
     }
+    ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(
+            std::move(runtime_filter_timers));
 }
 
 Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to