zhiqiang-hhhh commented on code in PR #27653:
URL: https://github.com/apache/doris/pull/27653#discussion_r1406980370


##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -190,6 +190,58 @@ class RuntimeFilterTimer {
     IRuntimeFilter* _runtime_filter;
 };
 
+struct RuntimeFilterTimerQueue {
+    constexpr static int64_t interval = 50;
+    void run() { _thread.detach(); }
+    void start() {
+        while (!_stop) {
+            std::unique_lock<std::mutex> lk(cv_m);
+
+            cv.wait(lk, [this] { return !_que.empty() || _stop; });
+            {
+                std::unique_lock<std::mutex> lc(_que_lock);
+                std::list<std::shared_ptr<pipeline::RuntimeFilterTimer>> 
new_que;
+                for (auto& it : _que) {
+                    if (it.use_count() == 1) {
+                        it->call_has_release();
+                    } else if (it->has_ready()) {
+                        it->call_has_ready();
+                    } else {
+                        int64_t ms_since_registration = MonotonicMillis() - 
it->registration_time();
+                        if (ms_since_registration > it->wait_time_ms()) {
+                            it->call_timeout();
+                        } else {
+                            new_que.push_back(std::move(it));
+                        }
+                    }
+                }
+                new_que.swap(_que);
+            }
+            std::this_thread::sleep_for(std::chrono::milliseconds(interval));
+        }
+        delete this;

Review Comment:
   Lifecycle of RuntimeFilterTimerQueue shuold be controled by ExecEnv



##########
be/src/runtime/exec_env_init.cpp:
##########
@@ -544,6 +546,7 @@ void ExecEnv::destroy() {
     SAFE_STOP(_task_group_manager);
     SAFE_STOP(_external_scan_context_mgr);
     SAFE_STOP(_fragment_mgr);
+    SAFE_STOP(_runtime_filter_timer_queue);

Review Comment:
   `_runtime_filter_timer_queue` should be deleted explicitly by ExecEnv, see 
`SAFE_DELETE`



##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -212,7 +161,7 @@ void RuntimeFilterDependency::add_filters(IRuntimeFilter* 
runtime_filter) {
             registration_time, wait_time_ms,
             
std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()), 
runtime_filter);
     runtime_filter->set_filter_timer(filter_timer);
-    RuntimeFilterTimerQueue::push_filter_timer(filter_timer);
+    
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);

Review Comment:
   `#include "runtime/exec_env.h"`



##########
be/src/runtime/exec_env_init.cpp:
##########
@@ -544,6 +546,7 @@ void ExecEnv::destroy() {
     SAFE_STOP(_task_group_manager);
     SAFE_STOP(_external_scan_context_mgr);
     SAFE_STOP(_fragment_mgr);
+    SAFE_STOP(_runtime_filter_timer_queue);

Review Comment:
   Append  `SAFE_DELETE(_runtime_filter_timer_queue);` to resource destroy area 
of this function.



##########
be/src/runtime/exec_env_init.cpp:
##########
@@ -158,6 +158,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     _frontend_client_cache = new 
FrontendServiceClientCache(config::max_client_cache_size_per_host);
     _broker_client_cache = new 
BrokerServiceClientCache(config::max_client_cache_size_per_host);
 
+    _runtime_filter_timer_queue = new 
doris::pipeline::RuntimeFilterTimerQueue();
+    _runtime_filter_timer_queue->run();

Review Comment:
   Does `_runtime_filter_timer_queue` have start dependency? 
   Shuold it be started before `pipeline_task_scheduler` or after? 
   Maybe find it a right place in `ExecEnv::init_pipeline_task_scheduler`



##########
be/src/runtime/exec_env_init.cpp:
##########
@@ -158,6 +158,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     _frontend_client_cache = new 
FrontendServiceClientCache(config::max_client_cache_size_per_host);
     _broker_client_cache = new 
BrokerServiceClientCache(config::max_client_cache_size_per_host);
 
+    _runtime_filter_timer_queue = new 
doris::pipeline::RuntimeFilterTimerQueue();

Review Comment:
   create of `_runtime_filter_timer_queue` should be moved to 
`ExecEnv::init_pipeline_task_scheduler`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to