yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1554993107


##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -114,82 +100,59 @@ std::string Dependency::debug_string(int 
indentation_level) {
 
 std::string RuntimeFilterDependency::debug_string(int indentation_level) {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer,
-                   "{}{}: id={}, block task = {}, ready={}, _filters = {}, 
_blocked_by_rf = {}",
-                   std::string(indentation_level * 2, ' '), _name, _node_id, 
_blocked_task.size(),
-                   _ready, _filters.load(), _blocked_by_rf ? 
_blocked_by_rf->load() : false);
+    fmt::format_to(debug_string_buffer, "{}, runtime filter: {}",
+                   Dependency::debug_string(indentation_level), 
_runtime_filter->formatted_state());
     return fmt::to_string(debug_string_buffer);
 }
 
-bool RuntimeFilterTimer::has_ready() {
-    std::unique_lock<std::mutex> lc(_lock);
-    return _is_ready;
+Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
+    std::unique_lock<std::mutex> lc(_task_lock);
+    auto ready = _ready.load() || _is_cancelled();
+    if (!ready && task) {
+        _add_block_task(task);
+        task->_blocked_dep = this;
+    }
+    return ready ? nullptr : this;
 }
 
 void RuntimeFilterTimer::call_timeout() {
-    std::unique_lock<std::mutex> lc(_lock);
-    if (_call_ready) {
-        return;
-    }
-    _call_timeout = true;
-    if (_parent) {
-        _parent->sub_filters(_filter_id);
-    }
+    _parent->set_timeout();
+    _parent->set_ready();
 }
 
 void RuntimeFilterTimer::call_ready() {
-    std::unique_lock<std::mutex> lc(_lock);
-    if (_call_timeout) {
-        return;
-    }
-    _call_ready = true;
-    if (_parent) {
-        _parent->sub_filters(_filter_id);
-    }
-    _is_ready = true;
-}
-
-void RuntimeFilterTimer::call_has_ready() {
-    std::unique_lock<std::mutex> lc(_lock);
-    DCHECK(!_call_timeout);
-    if (!_call_ready) {
-        _parent->sub_filters(_filter_id);
-    }
+    _parent->set_ready();
 }
 
-void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
-    const auto filter_id = runtime_filter->filter_id();
-    ;
-    _filters++;
-    _filter_ready_map[filter_id] = false;
-    int64_t registration_time = runtime_filter->registration_time();
-    int32 wait_time_ms = runtime_filter->wait_time_ms();
-    auto filter_timer = std::make_shared<RuntimeFilterTimer>(
-            filter_id, registration_time, wait_time_ms,
-            
std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()));
-    runtime_filter->set_filter_timer(filter_timer);
-    
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
-}
+void RuntimeFilterTimerQueue::start() {
+    while (!_stop) {
+        std::unique_lock<std::mutex> lk(cv_m);
 
-void RuntimeFilterDependency::sub_filters(int id) {
-    std::vector<PipelineXTask*> local_block_task {};
-    {
-        std::lock_guard<std::mutex> lk(_task_lock);
-        if (!_filter_ready_map[id]) {
-            _filter_ready_map[id] = true;
-            _filters--;
+        cv.wait(lk, [this] { return !_que.empty() || _stop; });

Review Comment:
   wait at most 3s, and then wake up to check if it is stopped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to