This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d839489872 [feature](pipelineX) Use dependency instead of block queue 
in the runtime filter (#26078)
9d839489872 is described below

commit 9d8394898722953284ffe69e6b575bea09940815
Author: Mryange <59914473+mrya...@users.noreply.github.com>
AuthorDate: Tue Oct 31 22:44:18 2023 +0800

    [feature](pipelineX) Use dependency instead of block queue in the runtime 
filter (#26078)
---
 be/src/exprs/runtime_filter.cpp                    |  11 +-
 be/src/exprs/runtime_filter.h                      |  17 ++++
 .../exec/multi_cast_data_stream_source.cpp         |   3 +-
 be/src/pipeline/exec/scan_operator.cpp             |   3 +-
 be/src/pipeline/pipeline_x/dependency.cpp          | 113 +++++++++++++++++++++
 be/src/pipeline/pipeline_x/dependency.h            |  61 +++++++++--
 be/src/pipeline/pipeline_x/operator.cpp            |   2 +-
 be/src/pipeline/pipeline_x/operator.h              |   4 +-
 be/src/pipeline/pipeline_x/pipeline_x_task.h       |   2 +-
 be/src/vec/exec/runtime_filter_consumer.cpp        |  21 +++-
 be/src/vec/exec/runtime_filter_consumer.h          |   5 +-
 11 files changed, 216 insertions(+), 26 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index c6e64fd0e55..10c2856112b 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -42,6 +42,7 @@
 #include "exprs/hybrid_set.h"
 #include "exprs/minmax_predicate.h"
 #include "gutil/strings/substitute.h"
+#include "pipeline/pipeline_x/dependency.h"
 #include "runtime/define_primitive_type.h"
 #include "runtime/large_int_value.h"
 #include "runtime/primitive_type.h"
@@ -62,7 +63,6 @@
 #include "vec/exprs/vliteral.h"
 #include "vec/exprs/vruntimefilter_wrapper.h"
 #include "vec/runtime/shared_hash_table_controller.h"
-
 namespace doris {
 
 // PrimitiveType-> PColumnType
@@ -1235,6 +1235,11 @@ void IRuntimeFilter::signal() {
     DCHECK(is_consumer());
     if (_enable_pipeline_exec) {
         _rf_state_atomic.store(RuntimeFilterState::READY);
+        if (!_filter_timer.empty()) {
+            for (auto& timer : _filter_timer) {
+                timer->call_ready();
+            }
+        }
     } else {
         std::unique_lock lock(_inner_mutex);
         _rf_state = RuntimeFilterState::READY;
@@ -1255,6 +1260,10 @@ void IRuntimeFilter::signal() {
     }
 }
 
+void 
IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer> 
timer) {
+    _filter_timer.push_back(timer);
+}
+
 BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
     return _wrapper->get_bloomfilter();
 }
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index fdd3b02ad63..f13877b869c 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -69,6 +69,10 @@ class VExprContext;
 struct SharedRuntimeFilterContext;
 } // namespace vectorized
 
+namespace pipeline {
+class RuntimeFilterTimer;
+} // namespace pipeline
+
 enum class RuntimeFilterType {
     UNKNOWN_FILTER = -1,
     IN_FILTER = 0,
@@ -384,6 +388,17 @@ public:
         }
     }
 
+    int32_t wait_time_ms() {
+        auto runtime_filter_wait_time_ms = _state == nullptr
+                                                   ? 
_query_ctx->runtime_filter_wait_time_ms()
+                                                   : 
_state->runtime_filter_wait_time_ms();
+        return runtime_filter_wait_time_ms;
+    }
+
+    int64_t registration_time() const { return registration_time_; }
+
+    void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>);
+
 protected:
     // serialize _wrapper to protobuf
     void to_protobuf(PInFilter* filter);
@@ -475,6 +490,8 @@ protected:
     // only effect on consumer
     std::unique_ptr<RuntimeProfile> _profile;
     bool _opt_remote_rf;
+
+    std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;
 };
 
 // avoid expose RuntimePredicateWrapper
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 61038baa328..4c6e21c5c6d 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -145,8 +145,7 @@ Status 
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
     }
     // init profile for runtime filter
     RuntimeFilterConsumer::_init_profile(profile());
-    _filter_dependency->set_filter_blocked_by_fn(
-            [this]() { return this->runtime_filters_are_ready_or_timeout(); });
+    init_runtime_filter_dependency(_filter_dependency.get());
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 075090916a8..601bf5b8b9f 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -128,8 +128,6 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, 
LocalStateInfo& info)
     _source_dependency->add_child(_open_dependency);
     _eos_dependency = 
EosDependency::create_shared(PipelineXLocalState<>::_parent->operator_id());
     _source_dependency->add_child(_eos_dependency);
-    _filter_dependency->set_filter_blocked_by_fn(
-            [this]() { return this->runtime_filters_are_ready_or_timeout(); });
     auto& p = _parent->cast<typename Derived::Parent>();
     set_scan_ranges(state, info.scan_ranges);
     _common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
@@ -143,6 +141,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, 
LocalStateInfo& info)
     }
     // init profile for runtime filter
     RuntimeFilterConsumer::_init_profile(profile());
+    init_runtime_filter_dependency(_filter_dependency.get());
 
     // 1: running at not pipeline mode will init profile.
     // 2: the scan node should create scanner at pipeline mode will init 
profile.
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index d56679f32a2..32bd06f5983 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -17,6 +17,10 @@
 
 #include "dependency.h"
 
+#include <memory>
+#include <mutex>
+
+#include "common/logging.h"
 #include "runtime/memory/mem_tracker.h"
 
 namespace doris::pipeline {
@@ -326,4 +330,113 @@ Status 
HashJoinDependency::extract_join_column(vectorized::Block& block,
     return Status::OK();
 }
 
+bool RuntimeFilterTimer::has_ready() {
+    std::unique_lock<std::mutex> lc(_lock);
+    return _runtime_filter->is_ready();
+}
+
+void RuntimeFilterTimer::call_timeout() {
+    std::unique_lock<std::mutex> lc(_lock);
+    if (_call_ready) {
+        return;
+    }
+    _call_timeout = true;
+    if (_parent) {
+        _parent->sub_filters();
+    }
+}
+
+void RuntimeFilterTimer::call_ready() {
+    std::unique_lock<std::mutex> lc(_lock);
+    if (_call_timeout) {
+        return;
+    }
+    _call_ready = true;
+    if (_parent) {
+        _parent->sub_filters();
+    }
+}
+
+void RuntimeFilterTimer::call_has_ready() {
+    std::unique_lock<std::mutex> lc(_lock);
+    DCHECK(!_call_timeout);
+    if (!_call_ready) {
+        _parent->sub_filters();
+    }
+}
+
+void RuntimeFilterTimer::call_has_release() {
+    // When the use count is equal to 1, only the timer queue still holds 
ownership,
+    // so there is no need to take any action.
+}
+
+struct RuntimeFilterTimerQueue {
+    constexpr static int64_t interval = 50;
+    void start() {
+        while (true) {
+            std::unique_lock<std::mutex> lk(cv_m);
+
+            cv.wait(lk, [this] { return !_que.empty(); });
+            {
+                std::unique_lock<std::mutex> lc(_que_lock);
+                std::list<std::shared_ptr<RuntimeFilterTimer>> new_que;
+                for (auto& it : _que) {
+                    if (it.use_count() == 1) {
+                        it->call_has_release();
+                    } else if (it->has_ready()) {
+                        it->call_has_ready();
+                    } else {
+                        int64_t ms_since_registration = MonotonicMillis() - 
it->registration_time();
+                        if (ms_since_registration > it->wait_time_ms()) {
+                            it->call_timeout();
+                        } else {
+                            new_que.push_back(std::move(it));
+                        }
+                    }
+                }
+                new_que.swap(_que);
+            }
+            std::this_thread::sleep_for(std::chrono::milliseconds(interval));
+        }
+    }
+    ~RuntimeFilterTimerQueue() { _thread.detach(); }
+    RuntimeFilterTimerQueue() { _thread = 
std::thread(&RuntimeFilterTimerQueue::start, this); }
+    static void push_filter_timer(std::shared_ptr<RuntimeFilterTimer> filter) {
+        static RuntimeFilterTimerQueue timer_que;
+
+        timer_que.push(filter);
+    }
+
+    void push(std::shared_ptr<RuntimeFilterTimer> filter) {
+        std::unique_lock<std::mutex> lc(_que_lock);
+        _que.push_back(filter);
+        cv.notify_all();
+    }
+
+    std::thread _thread;
+    std::condition_variable cv;
+    std::mutex cv_m;
+    std::mutex _que_lock;
+
+    std::list<std::shared_ptr<RuntimeFilterTimer>> _que;
+};
+
+void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
+    _filters++;
+    int64_t registration_time = runtime_filter->registration_time();
+    int32 wait_time_ms = runtime_filter->wait_time_ms();
+    auto filter_timer = std::make_shared<RuntimeFilterTimer>(
+            registration_time, wait_time_ms,
+            
std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()), 
runtime_filter);
+    runtime_filter->set_filter_timer(filter_timer);
+    RuntimeFilterTimerQueue::push_filter_timer(filter_timer);
+}
+
+void RuntimeFilterDependency::sub_filters() {
+    _filters--;
+    if (_filters == 0) {
+        *_blocked_by_rf = false;
+    }
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 131198c4496..d5349307e5b 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -19,11 +19,16 @@
 
 #include <sqltypes.h>
 
+#include <atomic>
 #include <functional>
 #include <memory>
 #include <mutex>
+#include <thread>
+#include <utility>
 
+#include "common/logging.h"
 #include "concurrentqueue.h"
+#include "gutil/integral_types.h"
 #include "pipeline/exec/data_queue.h"
 #include "pipeline/exec/multi_cast_data_streamer.h"
 #include "vec/common/hash_table/hash_map_context_creator.h"
@@ -194,30 +199,64 @@ protected:
     const int _node_id;
 };
 
-class FilterDependency final : public Dependency {
+class RuntimeFilterDependency;
+class RuntimeFilterTimer {
 public:
-    FilterDependency(int id, int node_id, std::string name)
-            : Dependency(id, name),
-              _runtime_filters_are_ready_or_timeout(nullptr),
-              _node_id(node_id) {}
+    RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms,
+                       std::shared_ptr<RuntimeFilterDependency> parent,
+                       IRuntimeFilter* runtime_filter)
+            : _parent(std::move(parent)),
+              _registration_time(registration_time),
+              _wait_time_ms(wait_time_ms),
+              _runtime_filter(runtime_filter) {}
 
-    FilterDependency* filter_blocked_by() {
-        if (!_runtime_filters_are_ready_or_timeout) {
+    void call_ready();
+
+    void call_timeout();
+
+    void call_has_ready();
+
+    void call_has_release();
+
+    bool has_ready();
+
+    int64_t registration_time() const { return _registration_time; }
+    int32_t wait_time_ms() const { return _wait_time_ms; }
+
+private:
+    bool _call_ready {};
+    bool _call_timeout {};
+    std::shared_ptr<RuntimeFilterDependency> _parent;
+    std::mutex _lock;
+    const int64_t _registration_time;
+    const int32_t _wait_time_ms;
+    IRuntimeFilter* _runtime_filter;
+};
+class RuntimeFilterDependency final : public Dependency {
+public:
+    RuntimeFilterDependency(int id, int node_id, std::string name)
+            : Dependency(id, name), _node_id(node_id) {}
+
+    RuntimeFilterDependency* filter_blocked_by() {
+        if (!_blocked_by_rf) {
             return nullptr;
         }
-        if (!_runtime_filters_are_ready_or_timeout()) {
+        if (*_blocked_by_rf) {
             return this;
         }
         return nullptr;
     }
     void* shared_state() override { return nullptr; }
-    void set_filter_blocked_by_fn(std::function<bool()> call_fn) {
-        _runtime_filters_are_ready_or_timeout = call_fn;
+    void add_filters(IRuntimeFilter* runtime_filter);
+    void sub_filters();
+    void set_blocked_by_rf(std::shared_ptr<std::atomic_bool> blocked_by_rf) {
+        _blocked_by_rf = blocked_by_rf;
     }
 
 protected:
-    std::function<bool()> _runtime_filters_are_ready_or_timeout;
     const int _node_id;
+    std::atomic_int _filters;
+    std::shared_ptr<std::atomic_bool> _blocked_by_rf;
 };
 
 class AndDependency final : public WriteDependency {
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 88b21b754fe..c1dab07390d 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -315,7 +315,7 @@ 
PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXB
           _state(state),
           _finish_dependency(new FinishDependency(parent->operator_id(), 
parent->node_id(),
                                                   parent->get_name() + 
"_FINISH_DEPENDENCY")) {
-    _filter_dependency = std::make_unique<FilterDependency>(
+    _filter_dependency = std::make_shared<RuntimeFilterDependency>(
             parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FILTER_DEPENDENCY");
 }
 
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index d2f170f9f28..5691d989e47 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -100,7 +100,7 @@ public:
     virtual Dependency* dependency() { return nullptr; }
 
     FinishDependency* finishdependency() { return _finish_dependency.get(); }
-    FilterDependency* filterdependency() { return _filter_dependency.get(); }
+    RuntimeFilterDependency* filterdependency() { return 
_filter_dependency.get(); }
 
 protected:
     friend class OperatorXBase;
@@ -134,7 +134,7 @@ protected:
     bool _closed = false;
     vectorized::Block _origin_block;
     std::shared_ptr<FinishDependency> _finish_dependency;
-    std::unique_ptr<FilterDependency> _filter_dependency;
+    std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
 };
 
 class OperatorXBase : public OperatorBase {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 3a33431192e..90fdda921f0 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -172,7 +172,7 @@ private:
     std::vector<Dependency*> _read_dependencies;
     WriteDependency* _write_dependencies;
     std::vector<FinishDependency*> _finish_dependencies;
-    FilterDependency* _filter_dependency;
+    RuntimeFilterDependency* _filter_dependency;
 
     DependencyMap _upstream_dependency;
 
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp 
b/be/src/vec/exec/runtime_filter_consumer.cpp
index c3bc8c0e22c..9eda2788f06 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -26,7 +26,9 @@ RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t 
filter_id,
         : _filter_id(filter_id),
           _runtime_filter_descs(runtime_filters),
           _row_descriptor_ref(row_descriptor),
-          _conjuncts_ref(conjuncts) {}
+          _conjuncts_ref(conjuncts) {
+    _blocked_by_rf = std::make_shared<std::atomic_bool>(false);
+}
 
 Status RuntimeFilterConsumer::init(RuntimeState* state) {
     _state = state;
@@ -72,7 +74,7 @@ Status RuntimeFilterConsumer::_register_runtime_filter() {
 }
 
 bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
-    if (!_blocked_by_rf) {
+    if (!*_blocked_by_rf) {
         return true;
     }
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
@@ -81,10 +83,19 @@ bool 
RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
             return false;
         }
     }
-    _blocked_by_rf = false;
+    *_blocked_by_rf = false;
     return true;
 }
 
+void RuntimeFilterConsumer::init_runtime_filter_dependency(
+        doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) {
+    _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf);
+    for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+        IRuntimeFilter* runtime_filter = 
_runtime_filter_ctxs[i].runtime_filter;
+        _runtime_filter_dependency->add_filters(runtime_filter);
+    }
+}
+
 Status RuntimeFilterConsumer::_acquire_runtime_filter() {
     SCOPED_TIMER(_acquire_runtime_filter_timer);
     VExprSPtrs vexprs;
@@ -99,14 +110,14 @@ Status RuntimeFilterConsumer::_acquire_runtime_filter() {
             _runtime_filter_ctxs[i].apply_mark = true;
         } else if (runtime_filter->current_state() == 
RuntimeFilterState::NOT_READY &&
                    !_runtime_filter_ctxs[i].apply_mark) {
-            _blocked_by_rf = true;
+            *_blocked_by_rf = true;
         } else if (!_runtime_filter_ctxs[i].apply_mark) {
             DCHECK(runtime_filter->current_state() != 
RuntimeFilterState::NOT_READY);
             _is_all_rf_applied = false;
         }
     }
     RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs));
-    if (_blocked_by_rf) {
+    if (*_blocked_by_rf) {
         return Status::WaitForRf("Runtime filters are neither not ready nor 
timeout");
     }
 
diff --git a/be/src/vec/exec/runtime_filter_consumer.h 
b/be/src/vec/exec/runtime_filter_consumer.h
index ed7a097901e..fcb6ed3c839 100644
--- a/be/src/vec/exec/runtime_filter_consumer.h
+++ b/be/src/vec/exec/runtime_filter_consumer.h
@@ -19,6 +19,7 @@
 
 #include "exec/exec_node.h"
 #include "exprs/runtime_filter.h"
+#include "pipeline/pipeline_x/dependency.h"
 
 namespace doris::vectorized {
 
@@ -37,6 +38,8 @@ public:
 
     bool runtime_filters_are_ready_or_timeout();
 
+    void 
init_runtime_filter_dependency(doris::pipeline::RuntimeFilterDependency*);
+
 protected:
     // Register and get all runtime filters at Init phase.
     Status _register_runtime_filter();
@@ -77,7 +80,7 @@ private:
 
     // True means all runtime filters are applied to scanners
     bool _is_all_rf_applied = true;
-    bool _blocked_by_rf = false;
+    std::shared_ptr<std::atomic_bool> _blocked_by_rf;
 
     RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr;
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to