yiguolei commented on code in PR #33332:
URL: https://github.com/apache/doris/pull/33332#discussion_r1555105480


##########
be/src/vec/exec/runtime_filter_consumer.cpp:
##########
@@ -75,36 +77,71 @@ bool 
RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
 }
 
 void RuntimeFilterConsumer::init_runtime_filter_dependency(
-        doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) {
-    _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf);
+        std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+                runtime_filter_dependencies,
+        const int id, const int node_id, const std::string& name) {
+    runtime_filter_dependencies.resize(_runtime_filter_descs.size());
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = 
_runtime_filter_ctxs[i].runtime_filter;
-        _runtime_filter_dependency->add_filters(runtime_filter);
+        runtime_filter_dependencies[i] = 
std::make_shared<pipeline::RuntimeFilterDependency>(
+                id, node_id, name, _state->get_query_ctx(), runtime_filter);
+        _runtime_filter_ctxs[i].runtime_filter_dependency = 
runtime_filter_dependencies[i].get();
+        auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+                runtime_filter->registration_time(), 
runtime_filter->wait_time_ms(),
+                runtime_filter_dependencies[i]);
+        runtime_filter->set_filter_timer(filter_timer);
+        
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
     }
 }
 
-Status RuntimeFilterConsumer::_acquire_runtime_filter() {
+Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {
     SCOPED_TIMER(_acquire_runtime_filter_timer);
     std::vector<vectorized::VRuntimeFilterPtr> vexprs;
+    Status rf_status = Status::OK();
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = 
_runtime_filter_ctxs[i].runtime_filter;
-        bool ready = runtime_filter->is_ready();
-        if (!ready) {
-            ready = runtime_filter->await();
-        }
-        if (ready && !_runtime_filter_ctxs[i].apply_mark) {
-            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, 
vexprs, false));
-            _runtime_filter_ctxs[i].apply_mark = true;
-        } else if (runtime_filter->current_state() == 
RuntimeFilterState::NOT_READY &&
-                   !_runtime_filter_ctxs[i].apply_mark) {
-            *_blocked_by_rf = true;
-        } else if (!_runtime_filter_ctxs[i].apply_mark) {
-            DCHECK(runtime_filter->current_state() != 
RuntimeFilterState::NOT_READY);
-            _is_all_rf_applied = false;
+        if (pipeline_x) {
+            DCHECK(_runtime_filter_ctxs[i].runtime_filter_dependency)
+                    << _state->pipeline_x_task()->debug_string();
+            auto* rf_dep = 
_runtime_filter_ctxs[i].runtime_filter_dependency->is_blocked_by(
+                    _state->pipeline_x_task());
+
+            bool timeout = 
_runtime_filter_ctxs[i].runtime_filter_dependency->timeout();
+
+            bool ready = rf_dep == nullptr && !timeout;
+            if (!ready) {
+                runtime_filter->await();
+            }
+            if (ready && !_runtime_filter_ctxs[i].apply_mark) {
+                // Runtime filter has been applied in open phase.
+                
RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false));
+                _runtime_filter_ctxs[i].apply_mark = true;
+            } else if (rf_dep != nullptr) {
+                // Runtime filter is neither ready nor timeout, so we should 
continue to wait RF.
+                return Status::WaitForRf("Runtime filters are neither not 
ready nor timeout");

Review Comment:
   返回这个没意义了



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to