This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch refactor_rf in repository https://gitbox.apache.org/repos/asf/doris.git
commit 55f31dba0e579007e5eb7594502feda0a25c5e5b Author: BiteTheDDDDt <x...@selectdb.com> AuthorDate: Thu Feb 20 13:36:17 2025 +0800 update --- be/src/pipeline/exec/nested_loop_join_build_operator.cpp | 15 +++++++-------- be/src/runtime_filter/role/consumer.h | 3 +-- be/src/runtime_filter/role/merger.h | 3 +-- be/src/runtime_filter/role/runtime_filter.h | 5 ++--- be/src/runtime_filter/runtime_filter_mgr.cpp | 7 +++---- be/src/runtime_filter/runtime_filter_mgr.h | 4 ++-- be/src/runtime_filter/runtime_filter_wrapper.h | 3 +-- 7 files changed, 17 insertions(+), 23 deletions(-) diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index b9481981306..157947028e7 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -36,14 +36,6 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta SCOPED_TIMER(_init_timer); auto& p = _parent->cast<NestedLoopJoinBuildSinkOperatorX>(); _shared_state->join_op_variants = p._join_op_variants; - return Status::OK(); -} - -Status NestedLoopJoinBuildSinkLocalState::open(RuntimeState* state) { - SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); - RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state)); - auto& p = _parent->cast<NestedLoopJoinBuildSinkOperatorX>(); _filter_src_expr_ctxs.resize(p._filter_src_expr_ctxs.size()); for (size_t i = 0; i < _filter_src_expr_ctxs.size(); i++) { RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state, _filter_src_expr_ctxs[i])); @@ -55,6 +47,13 @@ Status NestedLoopJoinBuildSinkLocalState::open(RuntimeState* state) { return Status::OK(); } +Status NestedLoopJoinBuildSinkLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state)); + return Status::OK(); +} + Status NestedLoopJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) { RETURN_IF_ERROR(_runtime_filter_slots->process(state, _shared_state->build_blocks)); RETURN_IF_ERROR(JoinBuildSinkLocalState::close(state, exec_status)); diff --git a/be/src/runtime_filter/role/consumer.h b/be/src/runtime_filter/role/consumer.h index 818c0eef684..8b5a490baf1 100644 --- a/be/src/runtime_filter/role/consumer.h +++ b/be/src/runtime_filter/role/consumer.h @@ -70,8 +70,7 @@ public: case State::APPLIED: return "APPLIED"; default: - throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, "Invalid State {}", - int(state)); + throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid State {}", int(state)); } } diff --git a/be/src/runtime_filter/role/merger.h b/be/src/runtime_filter/role/merger.h index 3d36353a2b1..627f6760afc 100644 --- a/be/src/runtime_filter/role/merger.h +++ b/be/src/runtime_filter/role/merger.h @@ -84,8 +84,7 @@ private: case State::WAITING_FOR_PRODUCT: return "WAITING_FOR_PRODUCT"; default: - throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, "Invalid State {}", - int(state)); + throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid State {}", int(state)); } } diff --git a/be/src/runtime_filter/role/runtime_filter.h b/be/src/runtime_filter/role/runtime_filter.h index ef3f9c913b2..0c9c80c0513 100644 --- a/be/src/runtime_filter/role/runtime_filter.h +++ b/be/src/runtime_filter/role/runtime_filter.h @@ -108,9 +108,8 @@ protected: try { _wrapper->check_state(assumed_states); } catch (const Exception& e) { - throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, - "rf wrapper meet invalid state, {}, {}", e.what(), - debug_string()); + throw Exception(ErrorCode::INTERNAL_ERROR, "rf wrapper meet invalid state, {}, {}", + e.what(), debug_string()); } } diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp b/be/src/runtime_filter/runtime_filter_mgr.cpp index 5ab3f3d6e49..ba222c22b94 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/runtime_filter/runtime_filter_mgr.cpp @@ -89,7 +89,7 @@ Status RuntimeFilterMgr::register_consumer_filter( } Status RuntimeFilterMgr::register_local_merger_filter( - const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions& options, + const TRuntimeFilterDesc& desc, const TQueryOptions& options, std::shared_ptr<RuntimeFilterProducer> producer_filter) { DCHECK(_is_global); SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); @@ -169,8 +169,7 @@ Status RuntimeFilterMgr::get_merge_addr(TNetworkAddress* addr) { Status RuntimeFilterMergeControllerEntity::_init_with_desc( const TRuntimeFilterDesc* runtime_filter_desc, - const std::vector<doris::TRuntimeFilterTargetParamsV2>&& targetv2_info, - const int producer_size) { + const std::vector<TRuntimeFilterTargetParamsV2>&& targetv2_info, const int producer_size) { auto filter_id = runtime_filter_desc->filter_id; GlobalMergeContext* cnt_val; { @@ -209,7 +208,7 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, RETURN_IF_ERROR(_init_with_desc( &filterid_to_desc.second, targetv2_iter == runtime_filter_params.rid_to_target_paramv2.end() - ? std::vector<doris::TRuntimeFilterTargetParamsV2> {} + ? std::vector<TRuntimeFilterTargetParamsV2> {} : targetv2_iter->second, build_iter->second)); } diff --git a/be/src/runtime_filter/runtime_filter_mgr.h b/be/src/runtime_filter/runtime_filter_mgr.h index 573ccb0a4a3..507e6ad085b 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.h +++ b/be/src/runtime_filter/runtime_filter_mgr.h @@ -66,7 +66,7 @@ struct GlobalMergeContext { std::mutex mtx; std::shared_ptr<RuntimeFilterMerger> merger; TRuntimeFilterDesc runtime_filter_desc; - std::vector<doris::TRuntimeFilterTargetParamsV2> targetv2_info; + std::vector<TRuntimeFilterTargetParamsV2> targetv2_info; std::unordered_set<UniqueId> arrive_id; std::vector<PNetworkAddress> source_addrs; }; @@ -170,7 +170,7 @@ public: private: Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc, - const std::vector<doris::TRuntimeFilterTargetParamsV2>&& target_info, + const std::vector<TRuntimeFilterTargetParamsV2>&& target_info, const int producer_size); UniqueId _query_id; diff --git a/be/src/runtime_filter/runtime_filter_wrapper.h b/be/src/runtime_filter/runtime_filter_wrapper.h index a5a8b6efcec..c51ad4484aa 100644 --- a/be/src/runtime_filter/runtime_filter_wrapper.h +++ b/be/src/runtime_filter/runtime_filter_wrapper.h @@ -180,8 +180,7 @@ public: case State::READY: return "READY"; default: - throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, "Invalid State {}", - int(state)); + throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid State {}", int(state)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org