github-actions[bot] commented on code in PR #61782:
URL: https://github.com/apache/doris/pull/61782#discussion_r3005034124
##########
be/src/exec/runtime_filter/runtime_filter_producer.cpp:
##########
@@ -98,51 +98,43 @@ Status RuntimeFilterProducer::publish(RuntimeState* state,
bool build_hash_table
return Status::OK();
}
-class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
-
DummyBrpcCallback<PSendFilterSizeResponse>> {
- std::shared_ptr<Dependency> _dependency;
- // Should use weak ptr here, because when query context deconstructs,
should also delete runtime filter
- // context, it not the memory is not released. And rpc is in another
thread, it will hold rf context
- // after query context because the rpc is not returned.
- std::weak_ptr<RuntimeFilterWrapper> _wrapper;
- using Base =
- AutoReleaseClosure<PSendFilterSizeRequest,
DummyBrpcCallback<PSendFilterSizeResponse>>;
- friend class RuntimeFilterProducer;
- ENABLE_FACTORY_CREATOR(SyncSizeClosure);
+// Callback for sync-size RPCs. Handles errors (disable wrapper + sub
dependency) in call().
+class SyncSizeCallback : public DummyBrpcCallback<PSendFilterSizeResponse> {
+ ENABLE_FACTORY_CREATOR(SyncSizeCallback);
- void _process_if_rpc_failed() override {
- Defer defer {[&]() {
- Base::_process_if_rpc_failed();
+public:
+ SyncSizeCallback(std::shared_ptr<Dependency> dependency,
+ std::shared_ptr<RuntimeFilterWrapper> wrapper)
+ : _dependency(std::move(dependency)), _wrapper(wrapper) {}
+
+ void call() override {
Review Comment:
**[Behavioral regression]** The old `SyncSizeClosure` called
`Base::_process_if_rpc_failed()` in its `Defer`, which would invoke
`ctx->cancel(Status::NetworkError("RPC meet failed: ..."))` when
`ignore_runtime_filter_error == false`. This cancelled the query on RF RPC
failure.
The new `SyncSizeCallback::call()` gracefully degrades (disables the filter
+ subs dependency) but **never cancels the query context**, regardless of
`ignore_runtime_filter_error`.
This breaks the existing regression test
`regression-test/suites/query_p0/join/test_inject_send_filter_size_fail/test_inject_send_filter_size_fail.groovy`
(lines 80-86), which expects:
```groovy
sql "set ignore_runtime_filter_error = false"
test {
sql "...";
exception "RPC meet failed"
}
```
The test expects the query to fail with `"RPC meet failed"` when the debug
point injects an RPC failure and `ignore_runtime_filter_error=false`. With this
PR, the query will succeed (with degraded performance) instead.
**Recommendation**: Either:
1. Update the regression test to reflect the new behavior (query succeeds
even on RF RPC failure), OR
2. Add query-context cancellation logic to `SyncSizeCallback::call()` when
`ignore_runtime_filter_error == false` (requires passing the flag or a weak
QueryContext to the callback).
Option 1 seems more consistent with the PR's design philosophy of graceful
degradation.
##########
be/src/util/brpc_closure.h:
##########
@@ -96,15 +93,15 @@ class AutoReleaseClosure : public google::protobuf::Closure
{
// Will delete itself
void Run() override {
Defer defer {[&]() { delete this; }};
- // If lock failed, it means the callback object is deconstructed, then
no need
- // to deal with the callback any more.
- if (auto tmp = callback_.lock()) {
- tmp->call();
- }
+ // shouldn't do heavy work here. all heavy work should be done in
callback's call() (which means in success/failure handlers)
if (cntl_->Failed()) {
- _process_if_rpc_failed();
+ LOG(WARNING) << "brpc failed: " << cntl_->ErrorText();
} else {
- _process_status<ResponseType>(response_.get());
+ _log_error_status<ResponseType>(response_.get());
+ }
+ // this must be the LAST operation in this function, because call()
may reuse the callback! (response_ is in callback_)
+ if (auto tmp = callback_.lock()) {
+ tmp->call();
Review Comment:
**[Observation - looks good]** The reordering is correct: log/check state
first, then `call()` last. The comment clearly documents why `call()` must be
last. This is the core fix and it correctly addresses the data race.
##########
be/src/exec/runtime_filter/runtime_filter.cpp:
##########
@@ -39,10 +39,7 @@ Status RuntimeFilter::_push_to_remote(RuntimeState* state,
const TNetworkAddress
auto merge_filter_callback =
DummyBrpcCallback<PMergeFilterResponse>::create_shared();
auto merge_filter_closure =
AutoReleaseClosure<PMergeFilterRequest,
DummyBrpcCallback<PMergeFilterResponse>>::
- create_unique(merge_filter_request, merge_filter_callback,
-
state->query_options().ignore_runtime_filter_error
- ? std::weak_ptr<QueryContext> {}
- : state->get_query_ctx_weak());
Review Comment:
**[Behavioral change]** `_push_to_remote` sends a `merge_filter` RPC with a
`DummyBrpcCallback` (empty `call()`). Previously, when
`ignore_runtime_filter_error == false`, the old code would cancel the query
context on RPC failure. Now, failures are only logged as warnings.
This is functionally safe (runtime filters are an optimization, not
correctness), but the behavioral change should be documented in the PR's
release notes. A merge_filter failure will result in the runtime filter never
completing at the merge node — consumers will wait until the RF timeout
expires, then proceed without the filter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]