Copilot commented on code in PR #61782:
URL: https://github.com/apache/doris/pull/61782#discussion_r3006400026
##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -274,10 +272,14 @@ Status
RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
auto sync_request = std::make_shared<PSyncFilterSizeRequest>();
sync_request->set_stage(iter->second.stage);
+ // if don't ignore rf error, cancel query
auto closure = AutoReleaseClosure<PSyncFilterSizeRequest,
DummyBrpcCallback<PSyncFilterSizeResponse>>::
create_unique(sync_request,
-
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared(), ctx);
+
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared(
+
query_ctx->ignore_runtime_filter_error()
+ ?
std::weak_ptr<QueryContext> {}
+ :
query_ctx->weak_from_this()));
Review Comment:
The callback passed into `AutoReleaseClosure::create_unique(...)` is
constructed as a temporary `shared_ptr` and is not retained elsewhere. Because
`AutoReleaseClosure` stores only a `weak_ptr` to the callback, it will be
destroyed immediately, so the new cancel-on-error logic in
`DummyBrpcCallback::call()` will never execute on RPC completion. Retain the
callback for the lifetime of the RPC (or adjust `AutoReleaseClosure` to hold a
strong reference).
##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -429,7 +431,7 @@ Status
RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
auto closure = AutoReleaseClosure<PPublishFilterRequestV2,
DummyBrpcCallback<PPublishFilterResponse>>::
create_unique(std::make_shared<PPublishFilterRequestV2>(apply_request),
-
DummyBrpcCallback<PPublishFilterResponse>::create_shared(), ctx);
+
DummyBrpcCallback<PPublishFilterResponse>::create_shared(ctx));
Review Comment:
Same lifetime issue here:
`DummyBrpcCallback<PPublishFilterResponse>::create_shared(ctx)` is passed as a
temporary into `create_unique(...)` and not stored anywhere. With
`AutoReleaseClosure` holding only a `weak_ptr`, the callback is destroyed
immediately and the new error/status cancellation in
`DummyBrpcCallback::call()` will never run. Retain the callback until RPC
completion or make the closure keep a strong reference.
##########
be/src/exec/runtime_filter/runtime_filter_producer.h:
##########
@@ -22,10 +22,63 @@
#include "exec/pipeline/dependency.h"
#include "exec/runtime_filter/runtime_filter.h"
#include "runtime/query_context.h"
-#include "runtime/runtime_profile.h"
namespace doris {
#include "common/compile_check_begin.h"
+
+// Callback for sync-size RPCs. Handles errors (disable wrapper + sub
dependency) in call().
+class SyncSizeCallback : public DummyBrpcCallback<PSendFilterSizeResponse> {
+ ENABLE_FACTORY_CREATOR(SyncSizeCallback);
Review Comment:
`SyncSizeCallback` inherits from `DummyBrpcCallback`, but this header does
not include `util/brpc_closure.h` (or another header that defines
`DummyBrpcCallback`). This makes `runtime_filter_producer.h` fragile and can
fail to compile when included from translation units that don't already include
`brpc_closure.h` (the new unit test includes this header directly). Add an
explicit include for `util/brpc_closure.h` (and any other required headers) to
satisfy the header's own dependencies.
##########
be/src/exec/exchange/vdata_stream_sender.h:
##########
@@ -164,6 +164,7 @@ class Channel {
std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>>
get_send_callback(RpcInstance* ins,
bool eos) {
+ // here we reuse the callback because it's re-construction may be
expensive due to many parameters' capture
Review Comment:
Spelling/grammar in comment: "it's re-construction" should be "its
reconstruction".
```suggestion
// here we reuse the callback because its reconstruction may be
expensive due to many parameters' capture
```
##########
be/src/util/brpc_closure.h:
##########
@@ -56,56 +76,38 @@ class DummyBrpcCallback {
// So that we need keep a shared ptr here to ensure that brpc could use
req/rep
// at any stage.
std::shared_ptr<Response> response_;
+
+private:
+ std::weak_ptr<QueryContext> _context;
};
// The closure will be deleted after callback.
// It could only be created by using shared ptr or unique ptr.
-// It will hold a weak ptr of T and call run of T
-// Callback() {
-// xxxx;
-// public
-// void run() {
-// logxxx
-// }
-// }
-//
-// std::shared_ptr<Callback> b;
-//
+// Example:
// std::unique_ptr<AutoReleaseClosure> a(b);
// brpc_call(a.release());
-template <typename T>
-concept HasStatus = requires(T* response) { response->status(); };
-
template <typename Request, typename Callback>
class AutoReleaseClosure : public google::protobuf::Closure {
using Weak = typename std::shared_ptr<Callback>::weak_type;
using ResponseType = typename Callback::ResponseType;
ENABLE_FACTORY_CREATOR(AutoReleaseClosure);
public:
- AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback>
callback,
- std::weak_ptr<QueryContext> context = {},
std::string_view error_msg = {})
- : request_(req), callback_(callback), context_(std::move(context))
{
+ AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback>
callback)
+ : request_(req), callback_(callback) {
this->cntl_ = callback->cntl_;
this->response_ = callback->response_;
}
~AutoReleaseClosure() override = default;
- // Will delete itself
+ // Will delete itself. all operations should be done in callback's call().
Run() only do one thing.
void Run() override {
Defer defer {[&]() { delete this; }};
- // If lock failed, it means the callback object is deconstructed, then
no need
- // to deal with the callback any more.
if (auto tmp = callback_.lock()) {
tmp->call();
}
- if (cntl_->Failed()) {
- _process_if_rpc_failed();
- } else {
- _process_status<ResponseType>(response_.get());
- }
}
Review Comment:
`AutoReleaseClosure::Run()` no longer performs any RPC failure / response
status processing after the callback is invoked. Because `AutoReleaseClosure`
stores the callback as a `weak_ptr`, many existing call sites pass a temporary
`DummyBrpcCallback::create_shared(...)` into `create_unique(...)` without
retaining the callback elsewhere; the callback is destroyed immediately and
`callback_.lock()` fails, so `call()` is never invoked and RPC failures/status
errors are now silently ignored. Either ensure `AutoReleaseClosure` keeps a
strong reference to the callback until `Run()` (e.g., store a `shared_ptr`), or
reintroduce error/status handling in `Run()` using state captured before any
callback reuse/mutation.
##########
be/src/exec/runtime_filter/runtime_filter_producer.h:
##########
@@ -22,10 +22,63 @@
#include "exec/pipeline/dependency.h"
#include "exec/runtime_filter/runtime_filter.h"
#include "runtime/query_context.h"
-#include "runtime/runtime_profile.h"
namespace doris {
#include "common/compile_check_begin.h"
+
+// Callback for sync-size RPCs. Handles errors (disable wrapper + sub
dependency) in call().
+class SyncSizeCallback : public DummyBrpcCallback<PSendFilterSizeResponse> {
+ ENABLE_FACTORY_CREATOR(SyncSizeCallback);
+
+public:
+ SyncSizeCallback(std::shared_ptr<Dependency> dependency,
+ std::shared_ptr<RuntimeFilterWrapper> wrapper,
+ std::weak_ptr<QueryContext> context)
+ : _dependency(std::move(dependency)), _wrapper(wrapper),
_context(std::move(context)) {}
+
+ void call() override {
+ // On error: disable the wrapper and sub the dependency here, because
set_synced_size()
+ // will never be called (the merge node won't respond with a sync).
+ // On success: do NOT sub here. The merge node will respond with
sync_filter_size,
+ // which calls set_synced_size() -> _dependency->sub().
+ if (cntl_->Failed()) {
+ if (auto w = _wrapper.lock()) {
+ w->set_state(RuntimeFilterWrapper::State::DISABLED,
cntl_->ErrorText());
+ }
+ if (auto ctx = _context.lock()) {
+ if (!ctx->ignore_runtime_filter_error()) {
+ ctx->cancel(Status::NetworkError("RPC meet failed: {}",
cntl_->ErrorText()));
+ }
+ }
+ ((CountedFinishDependency*)_dependency.get())->sub();
+ return;
+ }
+
+ Status status = Status::create(response_->status());
+ if (!status.ok()) {
+ if (auto w = _wrapper.lock()) {
+ w->set_state(RuntimeFilterWrapper::State::DISABLED,
status.to_string());
+ }
+ if (auto ctx = _context.lock()) {
+ if (!ctx->ignore_runtime_filter_error()) {
+ ctx->cancel(
+ Status::NetworkError("RPC meet error status: {}",
status.to_string()));
Review Comment:
`SyncSizeCallback::call()` wraps a non-OK response status into
`Status::NetworkError(...)` before cancelling the query. This changes the error
code semantics vs cancelling with the original `status` (e.g. INTERNAL_ERROR
becomes NETWORK_ERROR), which can affect downstream error handling and
reporting. Prefer cancelling with the original `status` (or a richer status
that preserves the original code/message) rather than reclassifying it as a
network error.
```suggestion
ctx->cancel(status);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]