yiguolei commented on code in PR #33332: URL: https://github.com/apache/doris/pull/33332#discussion_r1554993107
########## be/src/pipeline/pipeline_x/dependency.cpp: ########## @@ -114,82 +100,59 @@ std::string Dependency::debug_string(int indentation_level) { std::string RuntimeFilterDependency::debug_string(int indentation_level) { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, - "{}{}: id={}, block task = {}, ready={}, _filters = {}, _blocked_by_rf = {}", - std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(), - _ready, _filters.load(), _blocked_by_rf ? _blocked_by_rf->load() : false); + fmt::format_to(debug_string_buffer, "{}, runtime filter: {}", + Dependency::debug_string(indentation_level), _runtime_filter->formatted_state()); return fmt::to_string(debug_string_buffer); } -bool RuntimeFilterTimer::has_ready() { - std::unique_lock<std::mutex> lc(_lock); - return _is_ready; +Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) { + std::unique_lock<std::mutex> lc(_task_lock); + auto ready = _ready.load() || _is_cancelled(); + if (!ready && task) { + _add_block_task(task); + task->_blocked_dep = this; + } + return ready ? nullptr : this; } void RuntimeFilterTimer::call_timeout() { - std::unique_lock<std::mutex> lc(_lock); - if (_call_ready) { - return; - } - _call_timeout = true; - if (_parent) { - _parent->sub_filters(_filter_id); - } + _parent->set_timeout(); + _parent->set_ready(); } void RuntimeFilterTimer::call_ready() { - std::unique_lock<std::mutex> lc(_lock); - if (_call_timeout) { - return; - } - _call_ready = true; - if (_parent) { - _parent->sub_filters(_filter_id); - } - _is_ready = true; -} - -void RuntimeFilterTimer::call_has_ready() { - std::unique_lock<std::mutex> lc(_lock); - DCHECK(!_call_timeout); - if (!_call_ready) { - _parent->sub_filters(_filter_id); - } + _parent->set_ready(); } -void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) { - const auto filter_id = runtime_filter->filter_id(); - ; - _filters++; - _filter_ready_map[filter_id] = false; - int64_t registration_time = runtime_filter->registration_time(); - int32 wait_time_ms = runtime_filter->wait_time_ms(); - auto filter_timer = std::make_shared<RuntimeFilterTimer>( - filter_id, registration_time, wait_time_ms, - std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this())); - runtime_filter->set_filter_timer(filter_timer); - ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer); -} +void RuntimeFilterTimerQueue::start() { + while (!_stop) { + std::unique_lock<std::mutex> lk(cv_m); -void RuntimeFilterDependency::sub_filters(int id) { - std::vector<PipelineXTask*> local_block_task {}; - { - std::lock_guard<std::mutex> lk(_task_lock); - if (!_filter_ready_map[id]) { - _filter_ready_map[id] = true; - _filters--; + cv.wait(lk, [this] { return !_que.empty() || _stop; }); Review Comment: wait at most 3s, and then wake up to check if it is stopped. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org