This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9d839489872 [feature](pipelineX) Use dependency instead of block queue in the runtime filter (#26078) 9d839489872 is described below commit 9d8394898722953284ffe69e6b575bea09940815 Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Tue Oct 31 22:44:18 2023 +0800 [feature](pipelineX) Use dependency instead of block queue in the runtime filter (#26078) --- be/src/exprs/runtime_filter.cpp | 11 +- be/src/exprs/runtime_filter.h | 17 ++++ .../exec/multi_cast_data_stream_source.cpp | 3 +- be/src/pipeline/exec/scan_operator.cpp | 3 +- be/src/pipeline/pipeline_x/dependency.cpp | 113 +++++++++++++++++++++ be/src/pipeline/pipeline_x/dependency.h | 61 +++++++++-- be/src/pipeline/pipeline_x/operator.cpp | 2 +- be/src/pipeline/pipeline_x/operator.h | 4 +- be/src/pipeline/pipeline_x/pipeline_x_task.h | 2 +- be/src/vec/exec/runtime_filter_consumer.cpp | 21 +++- be/src/vec/exec/runtime_filter_consumer.h | 5 +- 11 files changed, 216 insertions(+), 26 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index c6e64fd0e55..10c2856112b 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -42,6 +42,7 @@ #include "exprs/hybrid_set.h" #include "exprs/minmax_predicate.h" #include "gutil/strings/substitute.h" +#include "pipeline/pipeline_x/dependency.h" #include "runtime/define_primitive_type.h" #include "runtime/large_int_value.h" #include "runtime/primitive_type.h" @@ -62,7 +63,6 @@ #include "vec/exprs/vliteral.h" #include "vec/exprs/vruntimefilter_wrapper.h" #include "vec/runtime/shared_hash_table_controller.h" - namespace doris { // PrimitiveType-> PColumnType @@ -1235,6 +1235,11 @@ void IRuntimeFilter::signal() { DCHECK(is_consumer()); if (_enable_pipeline_exec) { _rf_state_atomic.store(RuntimeFilterState::READY); + if (!_filter_timer.empty()) { + for (auto& timer : _filter_timer) { + timer->call_ready(); + } + } } else { std::unique_lock lock(_inner_mutex); _rf_state = RuntimeFilterState::READY; @@ -1255,6 +1260,10 @@ void IRuntimeFilter::signal() { } } +void IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer> timer) { + _filter_timer.push_back(timer); +} + BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const { return _wrapper->get_bloomfilter(); } diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index fdd3b02ad63..f13877b869c 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -69,6 +69,10 @@ class VExprContext; struct SharedRuntimeFilterContext; } // namespace vectorized +namespace pipeline { +class RuntimeFilterTimer; +} // namespace pipeline + enum class RuntimeFilterType { UNKNOWN_FILTER = -1, IN_FILTER = 0, @@ -384,6 +388,17 @@ public: } } + int32_t wait_time_ms() { + auto runtime_filter_wait_time_ms = _state == nullptr + ? _query_ctx->runtime_filter_wait_time_ms() + : _state->runtime_filter_wait_time_ms(); + return runtime_filter_wait_time_ms; + } + + int64_t registration_time() const { return registration_time_; } + + void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>); + protected: // serialize _wrapper to protobuf void to_protobuf(PInFilter* filter); @@ -475,6 +490,8 @@ protected: // only effect on consumer std::unique_ptr<RuntimeProfile> _profile; bool _opt_remote_rf; + + std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer; }; // avoid expose RuntimePredicateWrapper diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 61038baa328..4c6e21c5c6d 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -145,8 +145,7 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState } // init profile for runtime filter RuntimeFilterConsumer::_init_profile(profile()); - _filter_dependency->set_filter_blocked_by_fn( - [this]() { return this->runtime_filters_are_ready_or_timeout(); }); + init_runtime_filter_dependency(_filter_dependency.get()); return Status::OK(); } diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 075090916a8..601bf5b8b9f 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -128,8 +128,6 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) _source_dependency->add_child(_open_dependency); _eos_dependency = EosDependency::create_shared(PipelineXLocalState<>::_parent->operator_id()); _source_dependency->add_child(_eos_dependency); - _filter_dependency->set_filter_blocked_by_fn( - [this]() { return this->runtime_filters_are_ready_or_timeout(); }); auto& p = _parent->cast<typename Derived::Parent>(); set_scan_ranges(state, info.scan_ranges); _common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size()); @@ -143,6 +141,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) } // init profile for runtime filter RuntimeFilterConsumer::_init_profile(profile()); + init_runtime_filter_dependency(_filter_dependency.get()); // 1: running at not pipeline mode will init profile. // 2: the scan node should create scanner at pipeline mode will init profile. diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index d56679f32a2..32bd06f5983 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -17,6 +17,10 @@ #include "dependency.h" +#include <memory> +#include <mutex> + +#include "common/logging.h" #include "runtime/memory/mem_tracker.h" namespace doris::pipeline { @@ -326,4 +330,113 @@ Status HashJoinDependency::extract_join_column(vectorized::Block& block, return Status::OK(); } +bool RuntimeFilterTimer::has_ready() { + std::unique_lock<std::mutex> lc(_lock); + return _runtime_filter->is_ready(); +} + +void RuntimeFilterTimer::call_timeout() { + std::unique_lock<std::mutex> lc(_lock); + if (_call_ready) { + return; + } + _call_timeout = true; + if (_parent) { + _parent->sub_filters(); + } +} + +void RuntimeFilterTimer::call_ready() { + std::unique_lock<std::mutex> lc(_lock); + if (_call_timeout) { + return; + } + _call_ready = true; + if (_parent) { + _parent->sub_filters(); + } +} + +void RuntimeFilterTimer::call_has_ready() { + std::unique_lock<std::mutex> lc(_lock); + DCHECK(!_call_timeout); + if (!_call_ready) { + _parent->sub_filters(); + } +} + +void RuntimeFilterTimer::call_has_release() { + // When the use count is equal to 1, only the timer queue still holds ownership, + // so there is no need to take any action. +} + +struct RuntimeFilterTimerQueue { + constexpr static int64_t interval = 50; + void start() { + while (true) { + std::unique_lock<std::mutex> lk(cv_m); + + cv.wait(lk, [this] { return !_que.empty(); }); + { + std::unique_lock<std::mutex> lc(_que_lock); + std::list<std::shared_ptr<RuntimeFilterTimer>> new_que; + for (auto& it : _que) { + if (it.use_count() == 1) { + it->call_has_release(); + } else if (it->has_ready()) { + it->call_has_ready(); + } else { + int64_t ms_since_registration = MonotonicMillis() - it->registration_time(); + if (ms_since_registration > it->wait_time_ms()) { + it->call_timeout(); + } else { + new_que.push_back(std::move(it)); + } + } + } + new_que.swap(_que); + } + std::this_thread::sleep_for(std::chrono::milliseconds(interval)); + } + } + ~RuntimeFilterTimerQueue() { _thread.detach(); } + RuntimeFilterTimerQueue() { _thread = std::thread(&RuntimeFilterTimerQueue::start, this); } + static void push_filter_timer(std::shared_ptr<RuntimeFilterTimer> filter) { + static RuntimeFilterTimerQueue timer_que; + + timer_que.push(filter); + } + + void push(std::shared_ptr<RuntimeFilterTimer> filter) { + std::unique_lock<std::mutex> lc(_que_lock); + _que.push_back(filter); + cv.notify_all(); + } + + std::thread _thread; + std::condition_variable cv; + std::mutex cv_m; + std::mutex _que_lock; + + std::list<std::shared_ptr<RuntimeFilterTimer>> _que; +}; + +void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) { + _filters++; + int64_t registration_time = runtime_filter->registration_time(); + int32 wait_time_ms = runtime_filter->wait_time_ms(); + auto filter_timer = std::make_shared<RuntimeFilterTimer>( + registration_time, wait_time_ms, + std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()), runtime_filter); + runtime_filter->set_filter_timer(filter_timer); + RuntimeFilterTimerQueue::push_filter_timer(filter_timer); +} + +void RuntimeFilterDependency::sub_filters() { + _filters--; + if (_filters == 0) { + *_blocked_by_rf = false; + } +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 131198c4496..d5349307e5b 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -19,11 +19,16 @@ #include <sqltypes.h> +#include <atomic> #include <functional> #include <memory> #include <mutex> +#include <thread> +#include <utility> +#include "common/logging.h" #include "concurrentqueue.h" +#include "gutil/integral_types.h" #include "pipeline/exec/data_queue.h" #include "pipeline/exec/multi_cast_data_streamer.h" #include "vec/common/hash_table/hash_map_context_creator.h" @@ -194,30 +199,64 @@ protected: const int _node_id; }; -class FilterDependency final : public Dependency { +class RuntimeFilterDependency; +class RuntimeFilterTimer { public: - FilterDependency(int id, int node_id, std::string name) - : Dependency(id, name), - _runtime_filters_are_ready_or_timeout(nullptr), - _node_id(node_id) {} + RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms, + std::shared_ptr<RuntimeFilterDependency> parent, + IRuntimeFilter* runtime_filter) + : _parent(std::move(parent)), + _registration_time(registration_time), + _wait_time_ms(wait_time_ms), + _runtime_filter(runtime_filter) {} - FilterDependency* filter_blocked_by() { - if (!_runtime_filters_are_ready_or_timeout) { + void call_ready(); + + void call_timeout(); + + void call_has_ready(); + + void call_has_release(); + + bool has_ready(); + + int64_t registration_time() const { return _registration_time; } + int32_t wait_time_ms() const { return _wait_time_ms; } + +private: + bool _call_ready {}; + bool _call_timeout {}; + std::shared_ptr<RuntimeFilterDependency> _parent; + std::mutex _lock; + const int64_t _registration_time; + const int32_t _wait_time_ms; + IRuntimeFilter* _runtime_filter; +}; +class RuntimeFilterDependency final : public Dependency { +public: + RuntimeFilterDependency(int id, int node_id, std::string name) + : Dependency(id, name), _node_id(node_id) {} + + RuntimeFilterDependency* filter_blocked_by() { + if (!_blocked_by_rf) { return nullptr; } - if (!_runtime_filters_are_ready_or_timeout()) { + if (*_blocked_by_rf) { return this; } return nullptr; } void* shared_state() override { return nullptr; } - void set_filter_blocked_by_fn(std::function<bool()> call_fn) { - _runtime_filters_are_ready_or_timeout = call_fn; + void add_filters(IRuntimeFilter* runtime_filter); + void sub_filters(); + void set_blocked_by_rf(std::shared_ptr<std::atomic_bool> blocked_by_rf) { + _blocked_by_rf = blocked_by_rf; } protected: - std::function<bool()> _runtime_filters_are_ready_or_timeout; const int _node_id; + std::atomic_int _filters; + std::shared_ptr<std::atomic_bool> _blocked_by_rf; }; class AndDependency final : public WriteDependency { diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 88b21b754fe..c1dab07390d 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -315,7 +315,7 @@ PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXB _state(state), _finish_dependency(new FinishDependency(parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY")) { - _filter_dependency = std::make_unique<FilterDependency>( + _filter_dependency = std::make_shared<RuntimeFilterDependency>( parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY"); } diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index d2f170f9f28..5691d989e47 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -100,7 +100,7 @@ public: virtual Dependency* dependency() { return nullptr; } FinishDependency* finishdependency() { return _finish_dependency.get(); } - FilterDependency* filterdependency() { return _filter_dependency.get(); } + RuntimeFilterDependency* filterdependency() { return _filter_dependency.get(); } protected: friend class OperatorXBase; @@ -134,7 +134,7 @@ protected: bool _closed = false; vectorized::Block _origin_block; std::shared_ptr<FinishDependency> _finish_dependency; - std::unique_ptr<FilterDependency> _filter_dependency; + std::shared_ptr<RuntimeFilterDependency> _filter_dependency; }; class OperatorXBase : public OperatorBase { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 3a33431192e..90fdda921f0 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -172,7 +172,7 @@ private: std::vector<Dependency*> _read_dependencies; WriteDependency* _write_dependencies; std::vector<FinishDependency*> _finish_dependencies; - FilterDependency* _filter_dependency; + RuntimeFilterDependency* _filter_dependency; DependencyMap _upstream_dependency; diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index c3bc8c0e22c..9eda2788f06 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -26,7 +26,9 @@ RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id, : _filter_id(filter_id), _runtime_filter_descs(runtime_filters), _row_descriptor_ref(row_descriptor), - _conjuncts_ref(conjuncts) {} + _conjuncts_ref(conjuncts) { + _blocked_by_rf = std::make_shared<std::atomic_bool>(false); +} Status RuntimeFilterConsumer::init(RuntimeState* state) { _state = state; @@ -72,7 +74,7 @@ Status RuntimeFilterConsumer::_register_runtime_filter() { } bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() { - if (!_blocked_by_rf) { + if (!*_blocked_by_rf) { return true; } for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { @@ -81,10 +83,19 @@ bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() { return false; } } - _blocked_by_rf = false; + *_blocked_by_rf = false; return true; } +void RuntimeFilterConsumer::init_runtime_filter_dependency( + doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) { + _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf); + for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { + IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; + _runtime_filter_dependency->add_filters(runtime_filter); + } +} + Status RuntimeFilterConsumer::_acquire_runtime_filter() { SCOPED_TIMER(_acquire_runtime_filter_timer); VExprSPtrs vexprs; @@ -99,14 +110,14 @@ Status RuntimeFilterConsumer::_acquire_runtime_filter() { _runtime_filter_ctxs[i].apply_mark = true; } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY && !_runtime_filter_ctxs[i].apply_mark) { - _blocked_by_rf = true; + *_blocked_by_rf = true; } else if (!_runtime_filter_ctxs[i].apply_mark) { DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY); _is_all_rf_applied = false; } } RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs)); - if (_blocked_by_rf) { + if (*_blocked_by_rf) { return Status::WaitForRf("Runtime filters are neither not ready nor timeout"); } diff --git a/be/src/vec/exec/runtime_filter_consumer.h b/be/src/vec/exec/runtime_filter_consumer.h index ed7a097901e..fcb6ed3c839 100644 --- a/be/src/vec/exec/runtime_filter_consumer.h +++ b/be/src/vec/exec/runtime_filter_consumer.h @@ -19,6 +19,7 @@ #include "exec/exec_node.h" #include "exprs/runtime_filter.h" +#include "pipeline/pipeline_x/dependency.h" namespace doris::vectorized { @@ -37,6 +38,8 @@ public: bool runtime_filters_are_ready_or_timeout(); + void init_runtime_filter_dependency(doris::pipeline::RuntimeFilterDependency*); + protected: // Register and get all runtime filters at Init phase. Status _register_runtime_filter(); @@ -77,7 +80,7 @@ private: // True means all runtime filters are applied to scanners bool _is_all_rf_applied = true; - bool _blocked_by_rf = false; + std::shared_ptr<std::atomic_bool> _blocked_by_rf; RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr; }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org