Copilot commented on code in PR #61782:
URL: https://github.com/apache/doris/pull/61782#discussion_r3006400026


##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -274,10 +272,14 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
             auto sync_request = std::make_shared<PSyncFilterSizeRequest>();
             sync_request->set_stage(iter->second.stage);
 
+            // if don't ignore rf error, cancel query
             auto closure = AutoReleaseClosure<PSyncFilterSizeRequest,
                                               
DummyBrpcCallback<PSyncFilterSizeResponse>>::
                     create_unique(sync_request,
-                                  
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared(), ctx);
+                                  
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared(
+                                          
query_ctx->ignore_runtime_filter_error()
+                                                  ? 
std::weak_ptr<QueryContext> {}
+                                                  : 
query_ctx->weak_from_this()));

Review Comment:
   The callback passed into `AutoReleaseClosure::create_unique(...)` is 
constructed as a temporary `shared_ptr` and is not retained elsewhere. Because 
`AutoReleaseClosure` stores only a `weak_ptr` to the callback, it will be 
destroyed immediately, so the new cancel-on-error logic in 
`DummyBrpcCallback::call()` will never execute on RPC completion. Retain the 
callback for the lifetime of the RPC (or adjust `AutoReleaseClosure` to hold a 
strong reference).



##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -429,7 +431,7 @@ Status 
RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
         auto closure = AutoReleaseClosure<PPublishFilterRequestV2,
                                           
DummyBrpcCallback<PPublishFilterResponse>>::
                 
create_unique(std::make_shared<PPublishFilterRequestV2>(apply_request),
-                              
DummyBrpcCallback<PPublishFilterResponse>::create_shared(), ctx);
+                              
DummyBrpcCallback<PPublishFilterResponse>::create_shared(ctx));
 

Review Comment:
   Same lifetime issue here: 
`DummyBrpcCallback<PPublishFilterResponse>::create_shared(ctx)` is passed as a 
temporary into `create_unique(...)` and not stored anywhere. With 
`AutoReleaseClosure` holding only a `weak_ptr`, the callback is destroyed 
immediately and the new error/status cancellation in 
`DummyBrpcCallback::call()` will never run. Retain the callback until RPC 
completion or make the closure keep a strong reference.



##########
be/src/exec/runtime_filter/runtime_filter_producer.h:
##########
@@ -22,10 +22,63 @@
 #include "exec/pipeline/dependency.h"
 #include "exec/runtime_filter/runtime_filter.h"
 #include "runtime/query_context.h"
-#include "runtime/runtime_profile.h"
 
 namespace doris {
 #include "common/compile_check_begin.h"
+
+// Callback for sync-size RPCs. Handles errors (disable wrapper + sub 
dependency) in call().
+class SyncSizeCallback : public DummyBrpcCallback<PSendFilterSizeResponse> {
+    ENABLE_FACTORY_CREATOR(SyncSizeCallback);

Review Comment:
   `SyncSizeCallback` inherits from `DummyBrpcCallback`, but this header does 
not include `util/brpc_closure.h` (or another header that defines 
`DummyBrpcCallback`). This makes `runtime_filter_producer.h` fragile and can 
fail to compile when included from translation units that don't already include 
`brpc_closure.h` (the new unit test includes this header directly). Add an 
explicit include for `util/brpc_closure.h` (and any other required headers) to 
satisfy the header's own dependencies.



##########
be/src/exec/exchange/vdata_stream_sender.h:
##########
@@ -164,6 +164,7 @@ class Channel {
 
     std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> 
get_send_callback(RpcInstance* ins,
                                                                                
  bool eos) {
+        // here we reuse the callback because it's re-construction may be 
expensive due to many parameters' capture

Review Comment:
   Spelling/grammar in comment: "it's re-construction" should be "its 
reconstruction".
   ```suggestion
           // here we reuse the callback because its reconstruction may be 
expensive due to many parameters' capture
   ```



##########
be/src/util/brpc_closure.h:
##########
@@ -56,56 +76,38 @@ class DummyBrpcCallback {
     // So that we need keep a shared ptr here to ensure that brpc could use 
req/rep
     // at any stage.
     std::shared_ptr<Response> response_;
+
+private:
+    std::weak_ptr<QueryContext> _context;
 };
 
 // The closure will be deleted after callback.
 // It could only be created by using shared ptr or unique ptr.
-// It will hold a weak ptr of T and call run of T
-// Callback() {
-//  xxxx;
-//  public
-//  void run() {
-//      logxxx
-//  }
-//  }
-//
-//  std::shared_ptr<Callback> b;
-//
+// Example:
 //  std::unique_ptr<AutoReleaseClosure> a(b);
 //  brpc_call(a.release());
 
-template <typename T>
-concept HasStatus = requires(T* response) { response->status(); };
-
 template <typename Request, typename Callback>
 class AutoReleaseClosure : public google::protobuf::Closure {
     using Weak = typename std::shared_ptr<Callback>::weak_type;
     using ResponseType = typename Callback::ResponseType;
     ENABLE_FACTORY_CREATOR(AutoReleaseClosure);
 
 public:
-    AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> 
callback,
-                       std::weak_ptr<QueryContext> context = {}, 
std::string_view error_msg = {})
-            : request_(req), callback_(callback), context_(std::move(context)) 
{
+    AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> 
callback)
+            : request_(req), callback_(callback) {
         this->cntl_ = callback->cntl_;
         this->response_ = callback->response_;
     }
 
     ~AutoReleaseClosure() override = default;
 
-    //  Will delete itself
+    // Will delete itself. all operations should be done in callback's call(). 
Run() only do one thing.
     void Run() override {
         Defer defer {[&]() { delete this; }};
-        // If lock failed, it means the callback object is deconstructed, then 
no need
-        // to deal with the callback any more.
         if (auto tmp = callback_.lock()) {
             tmp->call();
         }
-        if (cntl_->Failed()) {
-            _process_if_rpc_failed();
-        } else {
-            _process_status<ResponseType>(response_.get());
-        }
     }

Review Comment:
   `AutoReleaseClosure::Run()` no longer performs any RPC failure / response 
status processing after the callback is invoked. Because `AutoReleaseClosure` 
stores the callback as a `weak_ptr`, many existing call sites pass a temporary 
`DummyBrpcCallback::create_shared(...)` into `create_unique(...)` without 
retaining the callback elsewhere; the callback is destroyed immediately and 
`callback_.lock()` fails, so `call()` is never invoked and RPC failures/status 
errors are now silently ignored. Either ensure `AutoReleaseClosure` keeps a 
strong reference to the callback until `Run()` (e.g., store a `shared_ptr`), or 
reintroduce error/status handling in `Run()` using state captured before any 
callback reuse/mutation.



##########
be/src/exec/runtime_filter/runtime_filter_producer.h:
##########
@@ -22,10 +22,63 @@
 #include "exec/pipeline/dependency.h"
 #include "exec/runtime_filter/runtime_filter.h"
 #include "runtime/query_context.h"
-#include "runtime/runtime_profile.h"
 
 namespace doris {
 #include "common/compile_check_begin.h"
+
+// Callback for sync-size RPCs. Handles errors (disable wrapper + sub 
dependency) in call().
+class SyncSizeCallback : public DummyBrpcCallback<PSendFilterSizeResponse> {
+    ENABLE_FACTORY_CREATOR(SyncSizeCallback);
+
+public:
+    SyncSizeCallback(std::shared_ptr<Dependency> dependency,
+                     std::shared_ptr<RuntimeFilterWrapper> wrapper,
+                     std::weak_ptr<QueryContext> context)
+            : _dependency(std::move(dependency)), _wrapper(wrapper), 
_context(std::move(context)) {}
+
+    void call() override {
+        // On error: disable the wrapper and sub the dependency here, because 
set_synced_size()
+        // will never be called (the merge node won't respond with a sync).
+        // On success: do NOT sub here. The merge node will respond with 
sync_filter_size,
+        // which calls set_synced_size() -> _dependency->sub().
+        if (cntl_->Failed()) {
+            if (auto w = _wrapper.lock()) {
+                w->set_state(RuntimeFilterWrapper::State::DISABLED, 
cntl_->ErrorText());
+            }
+            if (auto ctx = _context.lock()) {
+                if (!ctx->ignore_runtime_filter_error()) {
+                    ctx->cancel(Status::NetworkError("RPC meet failed: {}", 
cntl_->ErrorText()));
+                }
+            }
+            ((CountedFinishDependency*)_dependency.get())->sub();
+            return;
+        }
+
+        Status status = Status::create(response_->status());
+        if (!status.ok()) {
+            if (auto w = _wrapper.lock()) {
+                w->set_state(RuntimeFilterWrapper::State::DISABLED, 
status.to_string());
+            }
+            if (auto ctx = _context.lock()) {
+                if (!ctx->ignore_runtime_filter_error()) {
+                    ctx->cancel(
+                            Status::NetworkError("RPC meet error status: {}", 
status.to_string()));

Review Comment:
   `SyncSizeCallback::call()` wraps a non-OK response status into 
`Status::NetworkError(...)` before cancelling the query. This changes the error 
code semantics vs cancelling with the original `status` (e.g. INTERNAL_ERROR 
becomes NETWORK_ERROR), which can affect downstream error handling and 
reporting. Prefer cancelling with the original `status` (or a richer status 
that preserves the original code/message) rather than reclassifying it as a 
network error.
   ```suggestion
                       ctx->cancel(status);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to