yiguolei commented on code in PR #33332: URL: https://github.com/apache/doris/pull/33332#discussion_r1555104553
########## be/src/vec/exec/runtime_filter_consumer.cpp: ########## @@ -75,36 +77,71 @@ bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() { } void RuntimeFilterConsumer::init_runtime_filter_dependency( - doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) { - _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf); + std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>& + runtime_filter_dependencies, + const int id, const int node_id, const std::string& name) { + runtime_filter_dependencies.resize(_runtime_filter_descs.size()); for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; - _runtime_filter_dependency->add_filters(runtime_filter); + runtime_filter_dependencies[i] = std::make_shared<pipeline::RuntimeFilterDependency>( + id, node_id, name, _state->get_query_ctx(), runtime_filter); + _runtime_filter_ctxs[i].runtime_filter_dependency = runtime_filter_dependencies[i].get(); + auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>( + runtime_filter->registration_time(), runtime_filter->wait_time_ms(), + runtime_filter_dependencies[i]); + runtime_filter->set_filter_timer(filter_timer); + ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer); } } -Status RuntimeFilterConsumer::_acquire_runtime_filter() { +Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) { SCOPED_TIMER(_acquire_runtime_filter_timer); std::vector<vectorized::VRuntimeFilterPtr> vexprs; + Status rf_status = Status::OK(); for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; - bool ready = runtime_filter->is_ready(); - if (!ready) { - ready = runtime_filter->await(); - } - if (ready && !_runtime_filter_ctxs[i].apply_mark) { - RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false)); - _runtime_filter_ctxs[i].apply_mark = true; - } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY && - !_runtime_filter_ctxs[i].apply_mark) { - *_blocked_by_rf = true; - } else if (!_runtime_filter_ctxs[i].apply_mark) { - DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY); - _is_all_rf_applied = false; + if (pipeline_x) { + DCHECK(_runtime_filter_ctxs[i].runtime_filter_dependency) + << _state->pipeline_x_task()->debug_string(); + auto* rf_dep = _runtime_filter_ctxs[i].runtime_filter_dependency->is_blocked_by( + _state->pipeline_x_task()); + + bool timeout = _runtime_filter_ctxs[i].runtime_filter_dependency->timeout(); Review Comment: 我感觉我们不需要区分runtime filter 是ready了,还是timeout了。 1. runtime filter consumer 只需要知道rf dependency ready了,然后就开始消费rf。 2. 处理ready还是timeout 会引入一些状态,我们判断容易有lock,还有状态先后的问题。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org