Copilot commented on code in PR #61782:
URL: https://github.com/apache/doris/pull/61782#discussion_r2996028911
##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -256,7 +254,7 @@ Status
RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
auto closure = AutoReleaseClosure<PSyncFilterSizeRequest,
DummyBrpcCallback<PSyncFilterSizeResponse>>::
create_unique(std::make_shared<PSyncFilterSizeRequest>(),
-
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared(), ctx);
+
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared());
Review Comment:
This call site previously passed a `QueryContext` weak_ptr (conditionally
based on `ignore_runtime_filter_error`) into `AutoReleaseClosure` so failures
could be escalated/canceled when the option is false. With the ctx parameter
removed and the callback being `DummyBrpcCallback`, RPC failure/non-OK
`status()` handling appears to be reduced to logging only, which can change
query correctness/termination behavior. Recommendation (mandatory): either (1)
restore ctx-based cancellation semantics in `AutoReleaseClosure` (capturing
status/failure before invoking `call()`), or (2) replace `DummyBrpcCallback`
with a runtime-filter-specific callback that performs the required
cancel/disable/sub behavior under the same option gating.
##########
be/src/exec/runtime_filter/runtime_filter_producer.cpp:
##########
@@ -94,51 +94,43 @@ Status RuntimeFilterProducer::publish(RuntimeState* state,
bool build_hash_table
return Status::OK();
}
-class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
-
DummyBrpcCallback<PSendFilterSizeResponse>> {
- std::shared_ptr<Dependency> _dependency;
- // Should use weak ptr here, because when query context deconstructs,
should also delete runtime filter
- // context, it not the memory is not released. And rpc is in another
thread, it will hold rf context
- // after query context because the rpc is not returned.
- std::weak_ptr<RuntimeFilterWrapper> _wrapper;
- using Base =
- AutoReleaseClosure<PSendFilterSizeRequest,
DummyBrpcCallback<PSendFilterSizeResponse>>;
- friend class RuntimeFilterProducer;
- ENABLE_FACTORY_CREATOR(SyncSizeClosure);
-
- void _process_if_rpc_failed() override {
- Defer defer {[&]() {
- Base::_process_if_rpc_failed();
+// Callback for sync-size RPCs. Handles errors (disable wrapper + sub
dependency) in call().
+class SyncSizeCallback : public DummyBrpcCallback<PSendFilterSizeResponse> {
+ ENABLE_FACTORY_CREATOR(SyncSizeCallback);
+
+public:
+ SyncSizeCallback(std::shared_ptr<Dependency> dependency,
+ std::shared_ptr<RuntimeFilterWrapper> wrapper)
+ : _dependency(std::move(dependency)), _wrapper(wrapper) {}
+
+ void call() override {
+ // On error: disable the wrapper and sub the dependency here, because
set_synced_size()
+ // will never be called (the merge node won't respond with a sync).
+ // On success: do NOT sub here. The merge node will respond with
sync_filter_size,
+ // which calls set_synced_size() -> _dependency->sub().
+ if (cntl_->Failed()) {
+ if (auto w = _wrapper.lock()) {
+ w->set_state(RuntimeFilterWrapper::State::DISABLED,
cntl_->ErrorText());
+ }
((CountedFinishDependency*)_dependency.get())->sub();
- }};
- auto wrapper = _wrapper.lock();
- if (!wrapper) {
return;
}
- wrapper->set_state(RuntimeFilterWrapper::State::DISABLED,
cntl_->ErrorText());
- }
-
- void _process_if_meet_error_status(const Status& status) override {
- Defer defer {[&]() {
- Base::_process_if_meet_error_status(status);
+ Status status = Status::create(response_->status());
+ if (!status.ok()) {
+ if (auto w = _wrapper.lock()) {
+ w->set_state(RuntimeFilterWrapper::State::DISABLED,
status.to_string());
+ }
((CountedFinishDependency*)_dependency.get())->sub();
- }};
- auto wrapper = _wrapper.lock();
- if (!wrapper) {
- return;
}
Review Comment:
The C-style cast to `CountedFinishDependency*` can silently turn into UB if
`_dependency` is not actually that type. Recommendation (mandatory): store
`_dependency` as `std::shared_ptr<CountedFinishDependency>` in
`SyncSizeCallback` (and pass that type in), or use a `static_cast` with a debug
assertion documenting the invariant; avoid C-style casts here.
##########
be/src/exec/runtime_filter/runtime_filter_producer.cpp:
##########
@@ -199,13 +191,15 @@ Status RuntimeFilterProducer::send_size(RuntimeState*
state, uint64_t local_filt
}
auto request = std::make_shared<PSendFilterSizeRequest>();
- auto callback =
DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
+ auto callback = SyncSizeCallback::create_shared(_dependency, _wrapper);
+ // Store callback in the producer to keep it alive until the RPC completes.
+ // AutoReleaseClosure holds callbacks via weak_ptr, so without this the
callback
+ // would be destroyed when this function returns and error-path sub()
would never fire.
+ _sync_size_callback = callback;
// RuntimeFilter maybe deconstructed before the rpc finished, so that
could not use
// a raw pointer in closure. Has to use the context's shared ptr.
- auto closure = SyncSizeClosure::create_unique(request, callback,
_dependency, _wrapper,
-
state->query_options().ignore_runtime_filter_error
- ?
std::weak_ptr<QueryContext> {}
- :
state->get_query_ctx_weak());
+ auto closure = AutoReleaseClosure<PSendFilterSizeRequest,
SyncSizeCallback>::create_unique(
+ request, callback);
Review Comment:
`_sync_size_callback` is a single slot, but `send_size()` can be invoked
more than once during the producer lifecycle in some implementations; if a
second call happens before the first RPC completes, this assignment overwrites
the stored callback and can still allow the earlier callback to be destroyed
early (reintroducing the 'weak_ptr callback dies before completion' failure).
Also, the stored callback is never cleared on completion, which can
unnecessarily retain `_dependency` until producer teardown. Recommendation
(mandatory): make the storage support multiple in-flight RPCs (e.g.,
vector/list keyed by RPC generation or request id), and clear the stored entry
when the RPC completes (both success and error paths), e.g., by having the
callback clear a weak back-reference to the producer or by clearing in the
completion handler that observes the response.
##########
be/src/exec/exchange/vdata_stream_sender.h:
##########
@@ -164,6 +164,7 @@ class Channel {
std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>>
get_send_callback(RpcInstance* ins,
bool eos) {
+ // here we reuse the callback because it's re-construction may be
expensive due to many parameters' capture
Review Comment:
Fix grammar/spelling: change \"it's re-construction\" to \"its
reconstruction\" (and consider rephrasing to avoid the apostrophe possessive).
```suggestion
// here we reuse the callback because reconstructing it may be
expensive due to capturing many parameters
```
##########
be/test/exec/exchange/exchange_sink_test.cpp:
##########
@@ -234,4 +234,138 @@ TEST_F(ExchangeSinkTest, test_queue_size) {
}
}
+// Callback that records the state of response_ and cntl_ at the moment call()
is invoked,
+// then mutates them (simulating callback reuse triggering a new RPC). This
lets us verify:
+// 1. call() was invoked
+// 2. The callback saw the correct original state (before any mutation)
+// 3. After call(), the shared objects are mutated (so any code reading them
after call()
+// would see wrong values — this is the bug the fix prevents)
+template <typename Response>
+class StateCapturingCallback : public DummyBrpcCallback<Response> {
+ ENABLE_FACTORY_CREATOR(StateCapturingCallback);
+
+public:
+ StateCapturingCallback() = default;
+
+ enum class MutateAction {
+ WRITE_ERROR, // Write an error status into response
+ CLEAR_STATUS, // Clear the status field
+ RESET_CNTL, // Call cntl_->Reset()
+ };
+
+ void set_mutate_action(MutateAction action) { _action = action; }
+
+ void call() override {
+ call_invoked = true;
+ // Capture state BEFORE mutation — this is what call() sees.
+ cntl_failed_at_call_time = this->cntl_->Failed();
+ if (this->cntl_->Failed()) {
+ cntl_error_at_call_time = this->cntl_->ErrorText();
+ }
+ response_status_at_call_time =
Status::create(this->response_->status());
+
+ // Now mutate (simulating callback reuse / new RPC)
+ switch (_action) {
+ case MutateAction::WRITE_ERROR: {
+ Status err = Status::InternalError("injected by callback reuse");
+ err.to_protobuf(this->response_->mutable_status());
+ break;
+ }
+ case MutateAction::CLEAR_STATUS: {
+ this->response_->mutable_status()->set_status_code(0);
+ this->response_->mutable_status()->clear_error_msgs();
+ break;
+ }
+ case MutateAction::RESET_CNTL: {
+ this->cntl_->Reset();
+ break;
+ }
+ }
+ }
+
+ // Observable state
+ bool call_invoked = false;
+ bool cntl_failed_at_call_time = false;
+ std::string cntl_error_at_call_time;
+ Status response_status_at_call_time;
+
+private:
+ MutateAction _action = MutateAction::WRITE_ERROR;
+};
+
+using TestCallback = StateCapturingCallback<PTransmitDataResult>;
+
+// Test: Response starts OK. call() writes an error into it.
+// With correct ordering (log-before-call): the closure's logging sees OK (no
warning),
+// then call() runs and the callback captures the OK status at call time.
+// With WRONG ordering (call-before-log): call() writes error first, then the
closure
+// would log the error — a false positive. We verify call() saw OK at
invocation time,
+// proving it ran after (or at least not before) the status was checked by the
closure.
+TEST_F(ExchangeSinkTest, test_closure_call_sees_original_ok_response) {
+ auto callback = TestCallback::create_shared();
+ // Response starts OK (default).
+ callback->set_mutate_action(TestCallback::MutateAction::WRITE_ERROR);
Review Comment:
The PR description lists failing/passing test names
`test_closure_call_must_not_corrupt_status_check`,
`test_closure_call_must_not_hide_error_status`, and
`test_closure_call_must_not_hide_rpc_failure`, but the added tests in the diff
use different names (e.g., `test_closure_call_sees_original_ok_response`).
Recommendation (optional): update the PR description (or rename tests) so the
referenced test names match what's actually added, to make it easier to
validate the claim.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]