Gabriel39 commented on code in PR #26078: URL: https://github.com/apache/doris/pull/26078#discussion_r1376981760
########## be/src/vec/exec/runtime_filter_consumer.cpp: ########## @@ -26,7 +26,9 @@ RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id, : _filter_id(filter_id), _runtime_filter_descs(runtime_filters), _row_descriptor_ref(row_descriptor), - _conjuncts_ref(conjuncts) {} + _conjuncts_ref(conjuncts) { + _blocked_by_rf = std::make_shared<std::atomic_bool>(false); Review Comment: _blocked_by_rf(false) ########## be/src/pipeline/pipeline_x/dependency.h: ########## @@ -194,30 +199,67 @@ class FinishDependency final : public Dependency { const int _node_id; }; -class FilterDependency final : public Dependency { +class RuntimeFilterDependency; +class RuntimeFilterTimer { public: - FilterDependency(int id, int node_id, std::string name) - : Dependency(id, name), - _runtime_filters_are_ready_or_timeout(nullptr), - _node_id(node_id) {} + RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms, + std::shared_ptr<RuntimeFilterDependency> parent, + IRuntimeFilter* runtime_filter) + : _parent(std::move(parent)), + _registration_time(registration_time), + _wait_time_ms(wait_time_ms), + _runtime_filter(runtime_filter) {} - FilterDependency* filter_blocked_by() { - if (!_runtime_filters_are_ready_or_timeout) { + void call_ready(); + + void call_timeout(); + + void call_has_ready(); + + void call_has_release(); + + bool has_ready(); + + int64_t registration_time() const { return _registration_time; } + int32_t wait_time_ms() const { return _wait_time_ms; } + +private: + bool _call_ready {}; + bool _call_timeout {}; + std::shared_ptr<RuntimeFilterDependency> _parent; + std::mutex _lock; + const int64_t _registration_time; + const int32_t _wait_time_ms; + IRuntimeFilter* _runtime_filter; +}; +class RuntimeFilterDependency final : public Dependency { +public: + RuntimeFilterDependency(int id, int node_id, std::string name) + : Dependency(id, name), _node_id(node_id) {} + + RuntimeFilterDependency* filter_blocked_by() { + if (!_blocked_by_rf) { return nullptr; } - if (!_runtime_filters_are_ready_or_timeout()) { - return this; + if (_filters == 0) { + return nullptr; } - return nullptr; + return this; } void* shared_state() override { return nullptr; } - void set_filter_blocked_by_fn(std::function<bool()> call_fn) { - _runtime_filters_are_ready_or_timeout = call_fn; + void add_filters(IRuntimeFilter* runtime_filter); + void sub_filters(); + void call_task_ready() { + /// TODO: + } + void set_blocked_by_rf(std::shared_ptr<std::atomic_bool> blocked_by_rf) { + _blocked_by_rf = blocked_by_rf; } protected: - std::function<bool()> _runtime_filters_are_ready_or_timeout; const int _node_id; + std::atomic_int _filters; Review Comment: We use both `_filters` and `_blocked_by_rf` to find out whether this dependency is ready. But I think it is enough to use either one ########## be/src/pipeline/pipeline_x/operator.cpp: ########## @@ -315,7 +315,7 @@ PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXB _state(state), _finish_dependency(new FinishDependency(parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY")) { - _filter_dependency = std::make_unique<FilterDependency>( + _filter_dependency = std::make_shared<RuntimeFilterDependency>( Review Comment: `_filter_dependency` is only be hold in PipelineXLocalStateBase. So is it enough to use unique pointer? ########## be/src/pipeline/pipeline_x/dependency.h: ########## @@ -194,30 +199,67 @@ class FinishDependency final : public Dependency { const int _node_id; }; -class FilterDependency final : public Dependency { +class RuntimeFilterDependency; +class RuntimeFilterTimer { public: - FilterDependency(int id, int node_id, std::string name) - : Dependency(id, name), - _runtime_filters_are_ready_or_timeout(nullptr), - _node_id(node_id) {} + RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms, + std::shared_ptr<RuntimeFilterDependency> parent, + IRuntimeFilter* runtime_filter) + : _parent(std::move(parent)), + _registration_time(registration_time), + _wait_time_ms(wait_time_ms), + _runtime_filter(runtime_filter) {} - FilterDependency* filter_blocked_by() { - if (!_runtime_filters_are_ready_or_timeout) { + void call_ready(); + + void call_timeout(); + + void call_has_ready(); + + void call_has_release(); + + bool has_ready(); + + int64_t registration_time() const { return _registration_time; } + int32_t wait_time_ms() const { return _wait_time_ms; } + +private: + bool _call_ready {}; + bool _call_timeout {}; + std::shared_ptr<RuntimeFilterDependency> _parent; + std::mutex _lock; + const int64_t _registration_time; + const int32_t _wait_time_ms; + IRuntimeFilter* _runtime_filter; +}; +class RuntimeFilterDependency final : public Dependency { +public: + RuntimeFilterDependency(int id, int node_id, std::string name) + : Dependency(id, name), _node_id(node_id) {} + + RuntimeFilterDependency* filter_blocked_by() { + if (!_blocked_by_rf) { Review Comment: if (!_blocked_by_rf || _filters == 0) { ########## be/src/exprs/runtime_filter.h: ########## @@ -384,6 +388,17 @@ class IRuntimeFilter { } } + int32_t wait_time_ms() { + auto runtime_filter_wait_time_ms = _state == nullptr Review Comment: Always set `_query_ctx` and delete all of these judgement -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org