HappenLee commented on code in PR #15040: URL: https://github.com/apache/doris/pull/15040#discussion_r1050715741
########## be/src/exprs/runtime_filter.cpp: ########## @@ -1239,29 +1240,95 @@ Status IRuntimeFilter::get_prepared_vexprs(std::vector<doris::vectorized::VExpr* bool IRuntimeFilter::await() { DCHECK(is_consumer()); - SCOPED_TIMER(_await_time_cost); // bitmap filter is precise filter and only filter once, so it must be applied. int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER ? _state->query_options().query_timeout : _state->runtime_filter_wait_time_ms(); - std::unique_lock<std::mutex> lock(_inner_mutex); - if (!_is_ready) { - int64_t ms_since_registration = MonotonicMillis() - registration_time_; - int64_t ms_remaining = wait_times_ms - ms_since_registration; - if (ms_remaining <= 0) { - return _is_ready; + if (_state->enable_pipeline_exec() && + _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::NOT_READY) { + auto expected = RuntimeFilterState::NOT_READY; + if (!_rf_state_atomic.compare_exchange_strong( + expected, + MonotonicMillis() - registration_time_ >= wait_times_ms + ? RuntimeFilterState::TIME_OUT + : RuntimeFilterState::NOT_READY, + std::memory_order_acq_rel)) { + DCHECK(expected == RuntimeFilterState::READY); + return true; + } + return false; + } else if (_state->enable_pipeline_exec() && + _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::TIME_OUT) { + return false; + } else if (!_state->enable_pipeline_exec()) { + SCOPED_TIMER(_await_time_cost); + std::unique_lock<std::mutex> lock(_inner_mutex); + if (_rf_state != RuntimeFilterState::READY) { + int64_t ms_since_registration = MonotonicMillis() - registration_time_; + int64_t ms_remaining = wait_times_ms - ms_since_registration; + _rf_state = RuntimeFilterState::TIME_OUT; + if (ms_remaining <= 0) { + return false; + } + return _inner_cv.wait_for(lock, std::chrono::milliseconds(ms_remaining), + [this] { return _rf_state == RuntimeFilterState::READY; }); } - return _inner_cv.wait_for(lock, std::chrono::milliseconds(ms_remaining), - [this] { return this->_is_ready; }); } return true; } +bool IRuntimeFilter::is_ready_or_timeout() { + DCHECK(is_consumer()); + auto cur_state = _rf_state_atomic.load(std::memory_order_acquire); + // bitmap filter is precise filter and only filter once, so it must be applied. + int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER + ? _state->query_options().query_timeout + : _state->runtime_filter_wait_time_ms(); + int64_t ms_since_registration = MonotonicMillis() - registration_time_; + if (!_state->enable_pipeline_exec()) { + _rf_state = RuntimeFilterState::TIME_OUT; + return true; + } else if (is_ready()) { + if (cur_state == RuntimeFilterState::NOT_READY) { + _profile->add_info_string("EffectTime", std::to_string(ms_since_registration) + " ms"); + } + return true; + } else { + if (cur_state == RuntimeFilterState::NOT_READY) { Review Comment: the logic here is wired? rethink? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org