This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch refactor_rf
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5e5a483be0f36435c67c13bb459ccfbc7f03ae90
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Wed Mar 5 14:38:11 2025 +0800

    test RuntimeFilterMergeControllerEntity (#48678)
---
 be/src/runtime/fragment_mgr.cpp                    |  4 +-
 be/src/runtime_filter/runtime_filter_mgr.cpp       |  3 +-
 be/src/runtime_filter/runtime_filter_mgr.h         |  3 +-
 be/test/pipeline/thrift_builder.h                  | 17 ++++++++-
 be/test/runtime_filter/runtime_filter_mgr_test.cpp | 44 +++++++++++++++++++++-
 5 files changed, 62 insertions(+), 9 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 17948d71a3a..b550d187c01 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -854,8 +854,8 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
         
!params.local_params[0].runtime_filter_params.rid_to_runtime_filter.empty()) {
         auto handler = std::make_shared<RuntimeFilterMergeControllerEntity>(
                 
RuntimeFilterParamsContext::create(context->get_runtime_state()));
-        RETURN_IF_ERROR(handler->init(params.query_id, 
params.local_params[0].runtime_filter_params,
-                                      params.query_options));
+        RETURN_IF_ERROR(
+                handler->init(params.query_id, 
params.local_params[0].runtime_filter_params));
         query_ctx->set_merge_controller_handler(handler);
     }
 
diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp 
b/be/src/runtime_filter/runtime_filter_mgr.cpp
index 1eb7db6804d..ba7f397c685 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/runtime_filter/runtime_filter_mgr.cpp
@@ -202,8 +202,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
 }
 
 Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id,
-                                                const TRuntimeFilterParams& 
runtime_filter_params,
-                                                const TQueryOptions& 
query_options) {
+                                                const TRuntimeFilterParams& 
runtime_filter_params) {
     _query_id = query_id;
     _mem_tracker = 
std::make_shared<MemTracker>("RuntimeFilterMergeControllerEntity(experimental)");
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
diff --git a/be/src/runtime_filter/runtime_filter_mgr.h 
b/be/src/runtime_filter/runtime_filter_mgr.h
index 7709fda7969..b2343ecade1 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/runtime_filter/runtime_filter_mgr.h
@@ -148,8 +148,7 @@ public:
             : _query_id(0, 0), _state(state) {}
     ~RuntimeFilterMergeControllerEntity() = default;
 
-    Status init(UniqueId query_id, const TRuntimeFilterParams& 
runtime_filter_params,
-                const TQueryOptions& query_options);
+    Status init(UniqueId query_id, const TRuntimeFilterParams& 
runtime_filter_params);
 
     // handle merge rpc
     Status merge(std::weak_ptr<QueryContext> query_ctx, const 
PMergeFilterRequest* request,
diff --git a/be/test/pipeline/thrift_builder.h 
b/be/test/pipeline/thrift_builder.h
index 89eb8ed0fda..8b4a65e10d4 100644
--- a/be/test/pipeline/thrift_builder.h
+++ b/be/test/pipeline/thrift_builder.h
@@ -429,16 +429,29 @@ public:
     explicit TRuntimeFilterParamsBuilder(
             TNetworkAddress runtime_filter_merge_addr = TNetworkAddress(),
             std::map<int, std::vector<TRuntimeFilterTargetParams>> 
rid_to_target_param = {},
-            std::map<int, TRuntimeFilterDesc> rid_to_runtime_filter = {},
             std::map<int, int> runtime_filter_builder_num = {},
             std::map<int, std::vector<TRuntimeFilterTargetParamsV2>> 
rid_to_target_paramv2 = {})
             : _params() {
         _params.__set_runtime_filter_merge_addr(runtime_filter_merge_addr);
         _params.__set_rid_to_target_param(rid_to_target_param);
-        _params.__set_rid_to_runtime_filter(rid_to_runtime_filter);
         _params.__set_runtime_filter_builder_num(runtime_filter_builder_num);
         _params.__set_rid_to_target_paramv2(rid_to_target_paramv2);
     }
+    TRuntimeFilterParamsBuilder& add_rid_to_runtime_filter(
+            int rid, TRuntimeFilterDesc param = TRuntimeFilterDesc()) {
+        _params.__isset.rid_to_runtime_filter = true;
+        _params.rid_to_runtime_filter[rid] = param;
+        return *this;
+    }
+    TRuntimeFilterParamsBuilder& add_runtime_filter_builder_num(int rid, int 
builder_num) {
+        _params.runtime_filter_builder_num[rid] = builder_num;
+        return *this;
+    }
+    TRuntimeFilterParamsBuilder& add_rid_to_target_paramv2(
+            int rid, std::vector<TRuntimeFilterTargetParamsV2> target_paramv2 
= {}) {
+        _params.rid_to_target_paramv2[rid] = target_paramv2;
+        return *this;
+    }
     TRuntimeFilterParams& build() { return _params; }
     TRuntimeFilterParamsBuilder(const TRuntimeFilterParamsBuilder&) = delete;
     void operator=(const TRuntimeFilterParamsBuilder&) = delete;
diff --git a/be/test/runtime_filter/runtime_filter_mgr_test.cpp 
b/be/test/runtime_filter/runtime_filter_mgr_test.cpp
index 2ba6ce9948c..253631dd45a 100644
--- a/be/test/runtime_filter/runtime_filter_mgr_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_mgr_test.cpp
@@ -35,7 +35,7 @@ public:
     void TearDown() override {}
 };
 
-TEST_F(RuntimeFilterMgrTest, TestGlobalMgr) {
+TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) {
     auto filter_id = 0;
     std::shared_ptr<RuntimeFilterMgr> global_runtime_filter_mgr;
     std::shared_ptr<RuntimeFilterMgr> local_runtime_filter_mgr;
@@ -140,4 +140,46 @@ TEST_F(RuntimeFilterMgrTest, TestGlobalMgr) {
     }
 }
 
+TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMergeControllerEntity) {
+    int rid = 1;
+    UniqueId query_id;
+    std::shared_ptr<QueryContext> ctx;
+    std::shared_ptr<RuntimeFilterMergeControllerEntity> entity;
+    auto profile = std::make_shared<RuntimeProfile>("Test");
+    RuntimeState state;
+    {
+        // Create
+        auto query_options = TQueryOptionsBuilder().build();
+        auto fe_address = TNetworkAddress();
+        fe_address.hostname = BackendOptions::get_localhost();
+        fe_address.port = config::brpc_port;
+        ctx = QueryContext::create(TUniqueId(), ExecEnv::GetInstance(), 
query_options, fe_address,
+                                   true, fe_address, 
QuerySource::INTERNAL_FRONTEND);
+        entity = std::make_shared<RuntimeFilterMergeControllerEntity>(
+                RuntimeFilterParamsContext::create(ctx.get()));
+        entity->_state->set_state(&state);
+    }
+    {
+        // Init
+        TRuntimeFilterParams param =
+                TRuntimeFilterParamsBuilder()
+                        .add_rid_to_runtime_filter(
+                                rid,
+                                
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build())
+                        .add_rid_to_target_paramv2(rid, 
{TRuntimeFilterTargetParamsV2()})
+                        .build();
+        EXPECT_FALSE(entity->init(query_id, param).ok());
+
+        param = TRuntimeFilterParamsBuilder()
+                        .add_rid_to_runtime_filter(
+                                rid,
+                                
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build())
+                        .add_runtime_filter_builder_num(rid, 1)
+                        .add_rid_to_target_paramv2(rid, 
{TRuntimeFilterTargetParamsV2()})
+                        .build();
+        EXPECT_TRUE(entity->init(query_id, param).ok());
+        EXPECT_EQ(entity->query_id(), query_id);
+    }
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to