HappenLee commented on code in PR #15040:
URL: https://github.com/apache/doris/pull/15040#discussion_r1050715741


##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -1239,29 +1240,95 @@ Status 
IRuntimeFilter::get_prepared_vexprs(std::vector<doris::vectorized::VExpr*
 
 bool IRuntimeFilter::await() {
     DCHECK(is_consumer());
-    SCOPED_TIMER(_await_time_cost);
     // bitmap filter is precise filter and only filter once, so it must be 
applied.
     int64_t wait_times_ms = _wrapper->get_real_type() == 
RuntimeFilterType::BITMAP_FILTER
                                     ? _state->query_options().query_timeout
                                     : _state->runtime_filter_wait_time_ms();
-    std::unique_lock<std::mutex> lock(_inner_mutex);
-    if (!_is_ready) {
-        int64_t ms_since_registration = MonotonicMillis() - registration_time_;
-        int64_t ms_remaining = wait_times_ms - ms_since_registration;
-        if (ms_remaining <= 0) {
-            return _is_ready;
+    if (_state->enable_pipeline_exec() &&
+        _rf_state_atomic.load(std::memory_order_acquire) == 
RuntimeFilterState::NOT_READY) {
+        auto expected = RuntimeFilterState::NOT_READY;
+        if (!_rf_state_atomic.compare_exchange_strong(
+                    expected,
+                    MonotonicMillis() - registration_time_ >= wait_times_ms
+                            ? RuntimeFilterState::TIME_OUT
+                            : RuntimeFilterState::NOT_READY,
+                    std::memory_order_acq_rel)) {
+            DCHECK(expected == RuntimeFilterState::READY);
+            return true;
+        }
+        return false;
+    } else if (_state->enable_pipeline_exec() &&
+               _rf_state_atomic.load(std::memory_order_acquire) == 
RuntimeFilterState::TIME_OUT) {
+        return false;
+    } else if (!_state->enable_pipeline_exec()) {
+        SCOPED_TIMER(_await_time_cost);
+        std::unique_lock<std::mutex> lock(_inner_mutex);
+        if (_rf_state != RuntimeFilterState::READY) {
+            int64_t ms_since_registration = MonotonicMillis() - 
registration_time_;
+            int64_t ms_remaining = wait_times_ms - ms_since_registration;
+            _rf_state = RuntimeFilterState::TIME_OUT;
+            if (ms_remaining <= 0) {
+                return false;
+            }
+            return _inner_cv.wait_for(lock, 
std::chrono::milliseconds(ms_remaining),
+                                      [this] { return _rf_state == 
RuntimeFilterState::READY; });
         }
-        return _inner_cv.wait_for(lock, 
std::chrono::milliseconds(ms_remaining),
-                                  [this] { return this->_is_ready; });
     }
     return true;
 }
 
+bool IRuntimeFilter::is_ready_or_timeout() {
+    DCHECK(is_consumer());
+    auto cur_state = _rf_state_atomic.load(std::memory_order_acquire);
+    // bitmap filter is precise filter and only filter once, so it must be 
applied.
+    int64_t wait_times_ms = _wrapper->get_real_type() == 
RuntimeFilterType::BITMAP_FILTER
+                                    ? _state->query_options().query_timeout
+                                    : _state->runtime_filter_wait_time_ms();
+    int64_t ms_since_registration = MonotonicMillis() - registration_time_;
+    if (!_state->enable_pipeline_exec()) {
+        _rf_state = RuntimeFilterState::TIME_OUT;
+        return true;
+    } else if (is_ready()) {
+        if (cur_state == RuntimeFilterState::NOT_READY) {
+            _profile->add_info_string("EffectTime", 
std::to_string(ms_since_registration) + " ms");
+        }
+        return true;
+    } else {
+        if (cur_state == RuntimeFilterState::NOT_READY) {

Review Comment:
   the logic here is wired? rethink?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to