This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e95a6a7747 [exec](pipeline) runtime filter wait time (#34872)
0e95a6a7747 is described below

commit 0e95a6a77477d72488f3cb43af4f50e2541dcf04
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Mon May 20 23:43:59 2024 +0800

    [exec](pipeline) runtime filter wait time (#34872)
---
 be/src/exprs/runtime_filter.h               |  2 ++
 be/src/pipeline/dependency.cpp              | 39 +++++++++++++++++++++++------
 be/src/pipeline/dependency.h                | 17 +++++++++----
 be/src/vec/exec/runtime_filter_consumer.cpp | 25 +++++++++++++++---
 4 files changed, 68 insertions(+), 15 deletions(-)

diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 0deb7c3ddd7..3cedde8e8a5 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -244,6 +244,8 @@ public:
 
     bool has_remote_target() const { return _has_remote_target; }
 
+    bool has_local_target() const { return _has_local_target; }
+
     bool is_ready() const {
         return (!_enable_pipeline_exec && _rf_state == 
RuntimeFilterState::READY) ||
                (_enable_pipeline_exec &&
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index d37e3dc0401..e736174764d 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -119,6 +119,27 @@ void RuntimeFilterTimer::call_ready() {
     _parent->set_ready();
 }
 
+// should check rf timeout in two case:
+// 1. the rf is ready just remove the wait queue
+// 2. if the rf have local dependency, the rf should start wait when all local 
dependency is ready
+bool RuntimeFilterTimer::should_be_check_timeout() {
+    if (!_parent->ready() && !_local_runtime_filter_dependencies.empty()) {
+        bool all_ready = true;
+        for (auto& dep : _local_runtime_filter_dependencies) {
+            if (!dep->ready()) {
+                all_ready = false;
+                break;
+            }
+        }
+        if (all_ready) {
+            _local_runtime_filter_dependencies.clear();
+            _registration_time = MonotonicMillis();
+        }
+        return all_ready;
+    }
+    return true;
+}
+
 void RuntimeFilterTimerQueue::start() {
     while (!_stop) {
         std::unique_lock<std::mutex> lk(cv_m);
@@ -135,14 +156,18 @@ void RuntimeFilterTimerQueue::start() {
             for (auto& it : _que) {
                 if (it.use_count() == 1) {
                     // `use_count == 1` means this runtime filter has been 
released
-                } else if (it->_parent->is_blocked_by(nullptr)) {
-                    // This means runtime filter is not ready, so we call 
timeout or continue to poll this timer.
-                    int64_t ms_since_registration = MonotonicMillis() - 
it->registration_time();
-                    if (ms_since_registration > it->wait_time_ms()) {
-                        it->call_timeout();
-                    } else {
-                        new_que.push_back(std::move(it));
+                } else if (it->should_be_check_timeout()) {
+                    if (it->_parent->is_blocked_by(nullptr)) {
+                        // This means runtime filter is not ready, so we call 
timeout or continue to poll this timer.
+                        int64_t ms_since_registration = MonotonicMillis() - 
it->registration_time();
+                        if (ms_since_registration > it->wait_time_ms()) {
+                            it->call_timeout();
+                        } else {
+                            new_que.push_back(std::move(it));
+                        }
                     }
+                } else {
+                    new_que.push_back(std::move(it));
                 }
             }
             new_que.swap(_que);
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index de2e81bcda3..580cb8368c8 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -107,6 +107,7 @@ public:
     BasicSharedState* shared_state() { return _shared_state; }
     void set_shared_state(BasicSharedState* shared_state) { _shared_state = 
shared_state; }
     virtual std::string debug_string(int indentation_level = 0);
+    bool ready() const { return _ready; }
 
     // Start the watcher. We use it to count how long this dependency block 
the current pipeline task.
     void start_watcher() { _watcher.start(); }
@@ -231,11 +232,19 @@ public:
     int64_t registration_time() const { return _registration_time; }
     int32_t wait_time_ms() const { return _wait_time_ms; }
 
+    void set_local_runtime_filter_dependencies(
+            const std::vector<std::shared_ptr<RuntimeFilterDependency>>& deps) 
{
+        _local_runtime_filter_dependencies = deps;
+    }
+
+    bool should_be_check_timeout();
+
 private:
     friend struct RuntimeFilterTimerQueue;
     std::shared_ptr<RuntimeFilterDependency> _parent = nullptr;
+    std::vector<std::shared_ptr<RuntimeFilterDependency>> 
_local_runtime_filter_dependencies;
     std::mutex _lock;
-    const int64_t _registration_time;
+    int64_t _registration_time;
     const int32_t _wait_time_ms;
 };
 
@@ -258,11 +267,9 @@ struct RuntimeFilterTimerQueue {
 
     ~RuntimeFilterTimerQueue() = default;
     RuntimeFilterTimerQueue() { _thread = 
std::thread(&RuntimeFilterTimerQueue::start, this); }
-    void push_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer> 
filter) { push(filter); }
-
-    void push(std::shared_ptr<pipeline::RuntimeFilterTimer> filter) {
+    void 
push_filter_timer(std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>>&& 
filter) {
         std::unique_lock<std::mutex> lc(_que_lock);
-        _que.push_back(filter);
+        _que.insert(_que.end(), filter.begin(), filter.end());
         cv.notify_all();
     }
 
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp 
b/be/src/vec/exec/runtime_filter_consumer.cpp
index 0c6766b8bca..f80be824d58 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -67,17 +67,36 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency(
                 runtime_filter_dependencies,
         const int id, const int node_id, const std::string& name) {
     runtime_filter_dependencies.resize(_runtime_filter_descs.size());
+    std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> 
runtime_filter_timers(
+            _runtime_filter_descs.size());
+    std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>
+            local_runtime_filter_dependencies;
+
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = 
_runtime_filter_ctxs[i].runtime_filter;
         runtime_filter_dependencies[i] = 
std::make_shared<pipeline::RuntimeFilterDependency>(
                 id, node_id, name, runtime_filter);
         _runtime_filter_ctxs[i].runtime_filter_dependency = 
runtime_filter_dependencies[i].get();
-        auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+        runtime_filter_timers[i] = 
std::make_shared<pipeline::RuntimeFilterTimer>(
                 runtime_filter->registration_time(), 
runtime_filter->wait_time_ms(),
                 runtime_filter_dependencies[i]);
-        runtime_filter->set_filter_timer(filter_timer);
-        
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
+        runtime_filter->set_filter_timer(runtime_filter_timers[i]);
+        if (runtime_filter->has_local_target()) {
+            
local_runtime_filter_dependencies.emplace_back(runtime_filter_dependencies[i]);
+        }
+    }
+
+    // The gloabl runtime filter timer need set local runtime filter 
dependencies.
+    // start to wait before the local runtime filter ready
+    for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+        IRuntimeFilter* runtime_filter = 
_runtime_filter_ctxs[i].runtime_filter;
+        if (!runtime_filter->has_local_target()) {
+            runtime_filter_timers[i]->set_local_runtime_filter_dependencies(
+                    local_runtime_filter_dependencies);
+        }
     }
+    ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(
+            std::move(runtime_filter_timers));
 }
 
 Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to