This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch refactor_rf in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5e5a483be0f36435c67c13bb459ccfbc7f03ae90 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Wed Mar 5 14:38:11 2025 +0800 test RuntimeFilterMergeControllerEntity (#48678) --- be/src/runtime/fragment_mgr.cpp | 4 +- be/src/runtime_filter/runtime_filter_mgr.cpp | 3 +- be/src/runtime_filter/runtime_filter_mgr.h | 3 +- be/test/pipeline/thrift_builder.h | 17 ++++++++- be/test/runtime_filter/runtime_filter_mgr_test.cpp | 44 +++++++++++++++++++++- 5 files changed, 62 insertions(+), 9 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 17948d71a3a..b550d187c01 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -854,8 +854,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, !params.local_params[0].runtime_filter_params.rid_to_runtime_filter.empty()) { auto handler = std::make_shared<RuntimeFilterMergeControllerEntity>( RuntimeFilterParamsContext::create(context->get_runtime_state())); - RETURN_IF_ERROR(handler->init(params.query_id, params.local_params[0].runtime_filter_params, - params.query_options)); + RETURN_IF_ERROR( + handler->init(params.query_id, params.local_params[0].runtime_filter_params)); query_ctx->set_merge_controller_handler(handler); } diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp b/be/src/runtime_filter/runtime_filter_mgr.cpp index 1eb7db6804d..ba7f397c685 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/runtime_filter/runtime_filter_mgr.cpp @@ -202,8 +202,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( } Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, - const TRuntimeFilterParams& runtime_filter_params, - const TQueryOptions& query_options) { + const TRuntimeFilterParams& runtime_filter_params) { _query_id = query_id; _mem_tracker = std::make_shared<MemTracker>("RuntimeFilterMergeControllerEntity(experimental)"); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); diff --git a/be/src/runtime_filter/runtime_filter_mgr.h b/be/src/runtime_filter/runtime_filter_mgr.h index 7709fda7969..b2343ecade1 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.h +++ b/be/src/runtime_filter/runtime_filter_mgr.h @@ -148,8 +148,7 @@ public: : _query_id(0, 0), _state(state) {} ~RuntimeFilterMergeControllerEntity() = default; - Status init(UniqueId query_id, const TRuntimeFilterParams& runtime_filter_params, - const TQueryOptions& query_options); + Status init(UniqueId query_id, const TRuntimeFilterParams& runtime_filter_params); // handle merge rpc Status merge(std::weak_ptr<QueryContext> query_ctx, const PMergeFilterRequest* request, diff --git a/be/test/pipeline/thrift_builder.h b/be/test/pipeline/thrift_builder.h index 89eb8ed0fda..8b4a65e10d4 100644 --- a/be/test/pipeline/thrift_builder.h +++ b/be/test/pipeline/thrift_builder.h @@ -429,16 +429,29 @@ public: explicit TRuntimeFilterParamsBuilder( TNetworkAddress runtime_filter_merge_addr = TNetworkAddress(), std::map<int, std::vector<TRuntimeFilterTargetParams>> rid_to_target_param = {}, - std::map<int, TRuntimeFilterDesc> rid_to_runtime_filter = {}, std::map<int, int> runtime_filter_builder_num = {}, std::map<int, std::vector<TRuntimeFilterTargetParamsV2>> rid_to_target_paramv2 = {}) : _params() { _params.__set_runtime_filter_merge_addr(runtime_filter_merge_addr); _params.__set_rid_to_target_param(rid_to_target_param); - _params.__set_rid_to_runtime_filter(rid_to_runtime_filter); _params.__set_runtime_filter_builder_num(runtime_filter_builder_num); _params.__set_rid_to_target_paramv2(rid_to_target_paramv2); } + TRuntimeFilterParamsBuilder& add_rid_to_runtime_filter( + int rid, TRuntimeFilterDesc param = TRuntimeFilterDesc()) { + _params.__isset.rid_to_runtime_filter = true; + _params.rid_to_runtime_filter[rid] = param; + return *this; + } + TRuntimeFilterParamsBuilder& add_runtime_filter_builder_num(int rid, int builder_num) { + _params.runtime_filter_builder_num[rid] = builder_num; + return *this; + } + TRuntimeFilterParamsBuilder& add_rid_to_target_paramv2( + int rid, std::vector<TRuntimeFilterTargetParamsV2> target_paramv2 = {}) { + _params.rid_to_target_paramv2[rid] = target_paramv2; + return *this; + } TRuntimeFilterParams& build() { return _params; } TRuntimeFilterParamsBuilder(const TRuntimeFilterParamsBuilder&) = delete; void operator=(const TRuntimeFilterParamsBuilder&) = delete; diff --git a/be/test/runtime_filter/runtime_filter_mgr_test.cpp b/be/test/runtime_filter/runtime_filter_mgr_test.cpp index 2ba6ce9948c..253631dd45a 100644 --- a/be/test/runtime_filter/runtime_filter_mgr_test.cpp +++ b/be/test/runtime_filter/runtime_filter_mgr_test.cpp @@ -35,7 +35,7 @@ public: void TearDown() override {} }; -TEST_F(RuntimeFilterMgrTest, TestGlobalMgr) { +TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) { auto filter_id = 0; std::shared_ptr<RuntimeFilterMgr> global_runtime_filter_mgr; std::shared_ptr<RuntimeFilterMgr> local_runtime_filter_mgr; @@ -140,4 +140,46 @@ TEST_F(RuntimeFilterMgrTest, TestGlobalMgr) { } } +TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMergeControllerEntity) { + int rid = 1; + UniqueId query_id; + std::shared_ptr<QueryContext> ctx; + std::shared_ptr<RuntimeFilterMergeControllerEntity> entity; + auto profile = std::make_shared<RuntimeProfile>("Test"); + RuntimeState state; + { + // Create + auto query_options = TQueryOptionsBuilder().build(); + auto fe_address = TNetworkAddress(); + fe_address.hostname = BackendOptions::get_localhost(); + fe_address.port = config::brpc_port; + ctx = QueryContext::create(TUniqueId(), ExecEnv::GetInstance(), query_options, fe_address, + true, fe_address, QuerySource::INTERNAL_FRONTEND); + entity = std::make_shared<RuntimeFilterMergeControllerEntity>( + RuntimeFilterParamsContext::create(ctx.get())); + entity->_state->set_state(&state); + } + { + // Init + TRuntimeFilterParams param = + TRuntimeFilterParamsBuilder() + .add_rid_to_runtime_filter( + rid, + TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build()) + .add_rid_to_target_paramv2(rid, {TRuntimeFilterTargetParamsV2()}) + .build(); + EXPECT_FALSE(entity->init(query_id, param).ok()); + + param = TRuntimeFilterParamsBuilder() + .add_rid_to_runtime_filter( + rid, + TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build()) + .add_runtime_filter_builder_num(rid, 1) + .add_rid_to_target_paramv2(rid, {TRuntimeFilterTargetParamsV2()}) + .build(); + EXPECT_TRUE(entity->init(query_id, param).ok()); + EXPECT_EQ(entity->query_id(), query_id); + } +} + } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org