Copilot commented on code in PR #61782:
URL: https://github.com/apache/doris/pull/61782#discussion_r3006036210
##########
be/src/exec/runtime_filter/runtime_filter_producer.h:
##########
@@ -26,6 +26,58 @@
namespace doris {
#include "common/compile_check_begin.h"
+
+// Callback for sync-size RPCs. Handles errors (disable wrapper + sub
dependency) in call().
+class SyncSizeCallback : public DummyBrpcCallback<PSendFilterSizeResponse> {
+ ENABLE_FACTORY_CREATOR(SyncSizeCallback);
+
+public:
+ SyncSizeCallback(std::shared_ptr<Dependency> dependency,
+ std::shared_ptr<RuntimeFilterWrapper> wrapper,
+ std::weak_ptr<QueryContext> context)
+ : _dependency(std::move(dependency)), _wrapper(wrapper),
_context(std::move(context)) {}
+
+ void call() override {
+ // On error: disable the wrapper and sub the dependency here, because
set_synced_size()
+ // will never be called (the merge node won't respond with a sync).
+ // On success: do NOT sub here. The merge node will respond with
sync_filter_size,
+ // which calls set_synced_size() -> _dependency->sub().
+ if (cntl_->Failed()) {
+ if (auto w = _wrapper.lock()) {
+ w->set_state(RuntimeFilterWrapper::State::DISABLED,
cntl_->ErrorText());
+ }
+ if (auto ctx = _context.lock()) {
+ if (!ctx->ignore_runtime_filter_error()) {
+ ctx->cancel(Status::NetworkError("RPC meet failed: {}",
cntl_->ErrorText()));
+ }
+ }
+ ((CountedFinishDependency*)_dependency.get())->sub();
+ return;
+ }
+
+ Status status = Status::create(response_->status());
+ if (!status.ok()) {
+ if (auto w = _wrapper.lock()) {
+ w->set_state(RuntimeFilterWrapper::State::DISABLED,
status.to_string());
+ }
+ if (auto ctx = _context.lock()) {
+ if (!ctx->ignore_runtime_filter_error()) {
+ ctx->cancel(Status::NetworkError("RPC meet failed: {}",
status.to_string()));
+ }
+ }
+ ((CountedFinishDependency*)_dependency.get())->sub();
+ }
Review Comment:
On non-OK response status, the code cancels the query using
`Status::NetworkError("RPC meet failed: {}", status.to_string())`. This
misclassifies an application-level error status as a network error and also
reuses the “RPC meet failed” wording even though the RPC itself succeeded.
Consider cancelling with the actual `status` (or mapping to a more appropriate
error code/message) so error handling/metrics reflect the real failure mode.
##########
be/test/exec/exchange/exchange_sink_test.cpp:
##########
@@ -234,4 +234,138 @@ TEST_F(ExchangeSinkTest, test_queue_size) {
}
}
+// Callback that records the state of response_ and cntl_ at the moment call()
is invoked,
+// then mutates them (simulating callback reuse triggering a new RPC). This
lets us verify:
+// 1. call() was invoked
+// 2. The callback saw the correct original state (before any mutation)
+// 3. After call(), the shared objects are mutated (so any code reading them
after call()
+// would see wrong values — this is the bug the fix prevents)
+template <typename Response>
+class StateCapturingCallback : public DummyBrpcCallback<Response> {
+ ENABLE_FACTORY_CREATOR(StateCapturingCallback);
+
+public:
+ StateCapturingCallback() = default;
+
+ enum class MutateAction {
+ WRITE_ERROR, // Write an error status into response
+ CLEAR_STATUS, // Clear the status field
+ RESET_CNTL, // Call cntl_->Reset()
+ };
+
+ void set_mutate_action(MutateAction action) { _action = action; }
+
+ void call() override {
+ call_invoked = true;
+ // Capture state BEFORE mutation — this is what call() sees.
+ cntl_failed_at_call_time = this->cntl_->Failed();
+ if (this->cntl_->Failed()) {
+ cntl_error_at_call_time = this->cntl_->ErrorText();
+ }
+ response_status_at_call_time =
Status::create(this->response_->status());
+
+ // Now mutate (simulating callback reuse / new RPC)
+ switch (_action) {
+ case MutateAction::WRITE_ERROR: {
+ Status err = Status::InternalError("injected by callback reuse");
+ err.to_protobuf(this->response_->mutable_status());
+ break;
+ }
+ case MutateAction::CLEAR_STATUS: {
+ this->response_->mutable_status()->set_status_code(0);
+ this->response_->mutable_status()->clear_error_msgs();
+ break;
+ }
+ case MutateAction::RESET_CNTL: {
+ this->cntl_->Reset();
+ break;
+ }
+ }
+ }
+
+ // Observable state
+ bool call_invoked = false;
+ bool cntl_failed_at_call_time = false;
+ std::string cntl_error_at_call_time;
+ Status response_status_at_call_time;
+
+private:
+ MutateAction _action = MutateAction::WRITE_ERROR;
+};
+
+using TestCallback = StateCapturingCallback<PTransmitDataResult>;
+
+// Test: Response starts OK. call() writes an error into it.
+// With correct ordering (log-before-call): the closure's logging sees OK (no
warning),
+// then call() runs and the callback captures the OK status at call time.
+// With WRONG ordering (call-before-log): call() writes error first, then the
closure
+// would log the error — a false positive. We verify call() saw OK at
invocation time,
+// proving it ran after (or at least not before) the status was checked by the
closure.
+TEST_F(ExchangeSinkTest, test_closure_call_sees_original_ok_response) {
+ auto callback = TestCallback::create_shared();
+ // Response starts OK (default).
+ callback->set_mutate_action(TestCallback::MutateAction::WRITE_ERROR);
+
+ auto req = std::make_shared<PTransmitDataParams>();
+ auto* closure = new AutoReleaseClosure<PTransmitDataParams,
TestCallback>(req, callback);
+
+ closure->Run(); // self-deletes
+
+ EXPECT_TRUE(callback->call_invoked) << "call() should have been invoked";
+ EXPECT_TRUE(callback->response_status_at_call_time.ok())
+ << "call() must see the original OK response status. "
+ "If it saw an error, the ordering is wrong.";
+ EXPECT_FALSE(callback->cntl_failed_at_call_time);
Review Comment:
The new tests don’t actually validate the ordering guarantee they describe.
`StateCapturingCallback::call()` captures `cntl_`/`response_` state before *it*
mutates them, so it will observe the “original” state regardless of whether
`AutoReleaseClosure::Run()` calls `call()` before or after checking
`cntl_->Failed()` / `response_->status()`. Consider asserting an observable
side-effect of the closure’s pre-call check (e.g., capture/expect the warning
log output, or add a hook that records what `AutoReleaseClosure` observed
before invoking `call()`).
##########
be/src/exec/runtime_filter/runtime_filter.cpp:
##########
@@ -39,10 +39,7 @@ Status RuntimeFilter::_push_to_remote(RuntimeState* state,
const TNetworkAddress
auto merge_filter_callback =
DummyBrpcCallback<PMergeFilterResponse>::create_shared();
auto merge_filter_closure =
AutoReleaseClosure<PMergeFilterRequest,
DummyBrpcCallback<PMergeFilterResponse>>::
- create_unique(merge_filter_request, merge_filter_callback,
-
state->query_options().ignore_runtime_filter_error
- ? std::weak_ptr<QueryContext> {}
- : state->get_query_ctx_weak());
+ create_unique(merge_filter_request, merge_filter_callback);
Review Comment:
This RPC invocation no longer passes a `QueryContext` into
`AutoReleaseClosure`, so failures/error statuses will only be logged and won’t
cancel the query even when `ignore_runtime_filter_error` is false. Given
`_push_to_remote()` returns `Status::OK()` immediately and doesn’t join/inspect
the callback, this can silently drop runtime-filter propagation failures.
Consider reintroducing query-cancel behavior (checked before invoking
`callback->call()` to avoid the reuse race) or handling errors explicitly in
the callback.
```suggestion
create_unique(merge_filter_request,
merge_filter_callback,
state->get_query_ctx());
```
##########
be/src/exec/runtime_filter/runtime_filter_producer.h:
##########
@@ -26,6 +26,58 @@
namespace doris {
#include "common/compile_check_begin.h"
+
+// Callback for sync-size RPCs. Handles errors (disable wrapper + sub
dependency) in call().
+class SyncSizeCallback : public DummyBrpcCallback<PSendFilterSizeResponse> {
+ ENABLE_FACTORY_CREATOR(SyncSizeCallback);
+
+public:
+ SyncSizeCallback(std::shared_ptr<Dependency> dependency,
+ std::shared_ptr<RuntimeFilterWrapper> wrapper,
+ std::weak_ptr<QueryContext> context)
+ : _dependency(std::move(dependency)), _wrapper(wrapper),
_context(std::move(context)) {}
+
+ void call() override {
+ // On error: disable the wrapper and sub the dependency here, because
set_synced_size()
+ // will never be called (the merge node won't respond with a sync).
+ // On success: do NOT sub here. The merge node will respond with
sync_filter_size,
+ // which calls set_synced_size() -> _dependency->sub().
+ if (cntl_->Failed()) {
+ if (auto w = _wrapper.lock()) {
+ w->set_state(RuntimeFilterWrapper::State::DISABLED,
cntl_->ErrorText());
+ }
+ if (auto ctx = _context.lock()) {
+ if (!ctx->ignore_runtime_filter_error()) {
+ ctx->cancel(Status::NetworkError("RPC meet failed: {}",
cntl_->ErrorText()));
+ }
+ }
+ ((CountedFinishDependency*)_dependency.get())->sub();
+ return;
+ }
+
+ Status status = Status::create(response_->status());
+ if (!status.ok()) {
+ if (auto w = _wrapper.lock()) {
+ w->set_state(RuntimeFilterWrapper::State::DISABLED,
status.to_string());
+ }
+ if (auto ctx = _context.lock()) {
+ if (!ctx->ignore_runtime_filter_error()) {
+ ctx->cancel(Status::NetworkError("RPC meet failed: {}",
status.to_string()));
+ }
+ }
+ ((CountedFinishDependency*)_dependency.get())->sub();
+ }
+ }
+
+private:
+ std::shared_ptr<Dependency> _dependency;
+ // Should use weak ptr here, because when query context deconstructs,
should also delete runtime filter
+ // context, it not the memory is not released. And rpc is in another
thread, it will hold rf context
+ // after query context because the rpc is not returned.
Review Comment:
Comment grammar is unclear: “context, it not the memory is not released.”
This is hard to read and may be misleading for future maintainers.
```suggestion
// Should use a weak ptr here because when the query context is
destroyed, the runtime filter
// context should also be deleted; otherwise its memory may not be
released. Since the RPC runs
// in another thread, it may hold the runtime filter context after the
query context is destroyed
// if the RPC has not yet returned.
```
##########
be/src/exec/runtime_filter/runtime_filter_producer.h:
##########
@@ -26,6 +26,58 @@
namespace doris {
#include "common/compile_check_begin.h"
+
+// Callback for sync-size RPCs. Handles errors (disable wrapper + sub
dependency) in call().
+class SyncSizeCallback : public DummyBrpcCallback<PSendFilterSizeResponse> {
+ ENABLE_FACTORY_CREATOR(SyncSizeCallback);
+
+public:
+ SyncSizeCallback(std::shared_ptr<Dependency> dependency,
+ std::shared_ptr<RuntimeFilterWrapper> wrapper,
+ std::weak_ptr<QueryContext> context)
+ : _dependency(std::move(dependency)), _wrapper(wrapper),
_context(std::move(context)) {}
+
+ void call() override {
+ // On error: disable the wrapper and sub the dependency here, because
set_synced_size()
+ // will never be called (the merge node won't respond with a sync).
+ // On success: do NOT sub here. The merge node will respond with
sync_filter_size,
+ // which calls set_synced_size() -> _dependency->sub().
+ if (cntl_->Failed()) {
+ if (auto w = _wrapper.lock()) {
+ w->set_state(RuntimeFilterWrapper::State::DISABLED,
cntl_->ErrorText());
+ }
+ if (auto ctx = _context.lock()) {
+ if (!ctx->ignore_runtime_filter_error()) {
+ ctx->cancel(Status::NetworkError("RPC meet failed: {}",
cntl_->ErrorText()));
+ }
+ }
+ ((CountedFinishDependency*)_dependency.get())->sub();
+ return;
Review Comment:
`SyncSizeCallback` stores `_dependency` as `std::shared_ptr<Dependency>` but
then downcasts with a C-style cast to `CountedFinishDependency` to call
`sub()`. This is undefined behavior if a different `Dependency` subtype is ever
passed, and it also bypasses type safety. Prefer storing/passing
`std::shared_ptr<CountedFinishDependency>` (or at least
`dynamic_cast`/`dynamic_pointer_cast` with a CHECK) and calling `sub()`
directly.
##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -429,7 +423,7 @@ Status
RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
auto closure = AutoReleaseClosure<PPublishFilterRequestV2,
DummyBrpcCallback<PPublishFilterResponse>>::
create_unique(std::make_shared<PPublishFilterRequestV2>(apply_request),
-
DummyBrpcCallback<PPublishFilterResponse>::create_shared(), ctx);
+
DummyBrpcCallback<PPublishFilterResponse>::create_shared());
Review Comment:
Similar to `sync_filter_size`, `apply_filterv2` RPCs now use
`AutoReleaseClosure` without any query-context cancellation/propagation. If
these RPCs fail, the query may continue without required runtime filters (or
hang waiting for them depending on downstream logic) with only a warning log.
Consider adding explicit error handling (cancel query when
`ignore_runtime_filter_error` is false, or disable the affected filter and
unblock dependencies) in a callback, while keeping `call()` last to avoid
callback reuse races.
##########
be/src/util/brpc_closure.h:
##########
@@ -96,15 +93,15 @@ class AutoReleaseClosure : public google::protobuf::Closure
{
// Will delete itself
void Run() override {
Defer defer {[&]() { delete this; }};
- // If lock failed, it means the callback object is deconstructed, then
no need
- // to deal with the callback any more.
- if (auto tmp = callback_.lock()) {
- tmp->call();
- }
+ // shouldn't do heavy work here. all heavy work should be done in
callback's call() (which means in success/failure handlers)
if (cntl_->Failed()) {
- _process_if_rpc_failed();
+ LOG(WARNING) << "brpc failed: " << cntl_->ErrorText();
} else {
- _process_status<ResponseType>(response_.get());
+ _log_error_status<ResponseType>(response_.get());
+ }
+ // this must be the LAST operation in this function, because call()
may reuse the callback! (response_ is in callback_)
+ if (auto tmp = callback_.lock()) {
+ tmp->call();
}
Review Comment:
`AutoReleaseClosure::Run()` now only logs `cntl_->Failed()` / response
status errors, and then invokes `callback->call()` last (good for avoiding the
reuse race). However, this also removes the previous ability to cancel a
`QueryContext` on RPC failure/error status, which several runtime-filter call
sites previously depended on. Consider restoring an optional
`QueryContext`/error-handling hook executed *before* `callback->call()` so you
keep the ordering fix without weakening error propagation semantics across the
codebase.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]