BiteTheDDDDt commented on code in PR #64851:
URL: https://github.com/apache/doris/pull/64851#discussion_r3504117218


##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -48,6 +51,189 @@
 
 namespace doris {
 
+namespace {
+
+std::vector<RuntimeFilterPublishTarget> build_runtime_filter_publish_targets(
+        const std::vector<TRuntimeFilterTargetParamsV2>& targets) {
+    std::vector<RuntimeFilterPublishTarget> publish_targets;
+    publish_targets.reserve(targets.size());
+    for (const auto& target : targets) {
+        DORIS_CHECK(target.__isset.target_fragment_ids);
+        DORIS_CHECK(!target.target_fragment_ids.empty());
+        RuntimeFilterPublishTarget publish_target;
+        
publish_target.addr.set_hostname(target.target_fragment_instance_addr.hostname);
+        
publish_target.addr.set_port(target.target_fragment_instance_addr.port);
+        publish_target.fragment_ids = target.target_fragment_ids;
+        publish_targets.emplace_back(std::move(publish_target));
+    }
+    return publish_targets;
+}
+
+class RuntimeFilterRelayRpcClosure final : public google::protobuf::Closure {
+public:
+    RuntimeFilterRelayRpcClosure(std::shared_ptr<PPublishFilterRequestV2> 
request,
+                                 std::weak_ptr<QueryContext> query_ctx)
+            : _request(std::move(request)),
+              
_callback(HandleErrorBrpcCallback<PPublishFilterResponse>::create_shared(
+                      std::move(query_ctx))) {}
+
+    void Run() override {
+        std::unique_ptr<RuntimeFilterRelayRpcClosure> self(this);
+        _callback->call();
+    }
+
+    brpc::Controller* cntl() { return _callback->cntl_.get(); }
+    PPublishFilterRequestV2* request() { return _request.get(); }
+    PPublishFilterResponse* response() { return _callback->response_.get(); }
+
+private:
+    std::shared_ptr<PPublishFilterRequestV2> _request;
+    std::shared_ptr<HandleErrorBrpcCallback<PPublishFilterResponse>> _callback;
+};
+
+Status send_runtime_filter_relay_rpc(const RuntimeFilterPublishTask& task,
+                                     const butil::IOBuf& request_attachment, 
int timeout_ms,
+                                     std::weak_ptr<QueryContext> query_ctx) {
+    std::shared_ptr<PBackendService_Stub> stub(
+            
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(task.receiver.addr));
+    if (stub == nullptr) {
+        LOG(WARNING) << "Failed to init runtime filter relay rpc to "
+                     << task.receiver.addr.hostname() << ":" << 
task.receiver.addr.port();
+        return Status::InternalError("Failed to init runtime filter relay rpc 
to {}:{}",
+                                     task.receiver.addr.hostname(), 
task.receiver.addr.port());
+    }
+
+    // brpc calls Run() exactly once; RuntimeFilterRelayRpcClosure deletes 
itself there.
+    auto* closure = new RuntimeFilterRelayRpcClosure(
+            std::make_shared<PPublishFilterRequestV2>(task.request), 
std::move(query_ctx));
+    if (request_attachment.size() > 0) {
+        closure->cntl()->request_attachment().append(request_attachment);
+    }
+    closure->cntl()->set_timeout_ms(timeout_ms);
+    if (config::execution_ignore_eovercrowded) {
+        closure->cntl()->ignore_eovercrowded();
+    }
+    stub->apply_filterv2(closure->cntl(), closure->request(), 
closure->response(), closure);
+    return Status::OK();
+}
+
+void set_request_direct_publish_target(const TRuntimeFilterTargetParamsV2& 
target,
+                                       PPublishFilterRequestV2* request) {
+    DORIS_CHECK(target.__isset.target_fragment_ids);
+    DORIS_CHECK(!target.target_fragment_ids.empty());
+    for (const auto& target_fragment_id : target.target_fragment_ids) {
+        request->add_fragment_ids(target_fragment_id);
+    }
+}
+
+} // namespace
+
+std::vector<std::vector<RuntimeFilterPublishTarget>> 
split_runtime_filter_publish_targets(
+        const std::vector<RuntimeFilterPublishTarget>& targets, int fanout) {
+    DORIS_CHECK(!targets.empty());
+    DORIS_CHECK(fanout > 0);
+    size_t slice_count = std::min(targets.size(), static_cast<size_t>(fanout));
+    std::vector<std::vector<RuntimeFilterPublishTarget>> slices;
+    slices.reserve(slice_count);
+    for (size_t offset = 0; offset < targets.size();) {
+        size_t remaining_targets = targets.size() - offset;
+        size_t remaining_slices = slice_count - slices.size();
+        size_t slice_size = (remaining_targets + remaining_slices - 1) / 
remaining_slices;
+        slices.emplace_back(targets.begin() + offset, targets.begin() + offset 
+ slice_size);
+        offset += slice_size;
+    }
+    return slices;
+}
+
+std::vector<RuntimeFilterPublishTask> build_runtime_filter_publish_tasks(
+        const PPublishFilterRequestV2& base_request,
+        const std::vector<RuntimeFilterPublishTarget>& targets, int fanout) {
+    std::vector<RuntimeFilterPublishTask> tasks;
+    auto slices = split_runtime_filter_publish_targets(targets, fanout);
+    PPublishFilterRequestV2 request_template = base_request;
+    request_template.clear_fragment_ids();
+    request_template.clear_fragment_instance_ids();
+    request_template.clear_forward_targets();
+    tasks.reserve(slices.size());
+    for (const auto& slice : slices) {
+        DORIS_CHECK(!slice.empty());
+        RuntimeFilterPublishTask task;
+        task.receiver = slice.front();
+        task.request = request_template;
+
+        for (int32_t fragment_id : task.receiver.fragment_ids) {
+            task.request.add_fragment_ids(fragment_id);
+        }
+        for (size_t i = 1; i < slice.size(); ++i) {
+            DORIS_CHECK(!slice[i].fragment_ids.empty());
+            PPublishFilterForwardTarget* forward_target = 
task.request.add_forward_targets();
+            forward_target->mutable_target_addr()->CopyFrom(slice[i].addr);
+            for (int32_t fragment_id : slice[i].fragment_ids) {
+                forward_target->add_fragment_ids(fragment_id);

Review Comment:
   I think we should not reuse the receiver fragment ids here. 
`task.receiver.fragment_ids` is for the current relay receiver to apply 
locally, while each `forward_target` must carry the downstream target address 
own `fragment_ids`. FE builds `target_fragment_ids` per target address by 
appending each RF target scan fragment id, so a single RF can have multiple 
target scans and the fragment id lists are not guaranteed to be identical.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to