This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch refactor_rf
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 55f31dba0e579007e5eb7594502feda0a25c5e5b
Author: BiteTheDDDDt <x...@selectdb.com>
AuthorDate: Thu Feb 20 13:36:17 2025 +0800

    update
---
 be/src/pipeline/exec/nested_loop_join_build_operator.cpp | 15 +++++++--------
 be/src/runtime_filter/role/consumer.h                    |  3 +--
 be/src/runtime_filter/role/merger.h                      |  3 +--
 be/src/runtime_filter/role/runtime_filter.h              |  5 ++---
 be/src/runtime_filter/runtime_filter_mgr.cpp             |  7 +++----
 be/src/runtime_filter/runtime_filter_mgr.h               |  4 ++--
 be/src/runtime_filter/runtime_filter_wrapper.h           |  3 +--
 7 files changed, 17 insertions(+), 23 deletions(-)

diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index b9481981306..157947028e7 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -36,14 +36,6 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkSta
     SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<NestedLoopJoinBuildSinkOperatorX>();
     _shared_state->join_op_variants = p._join_op_variants;
-    return Status::OK();
-}
-
-Status NestedLoopJoinBuildSinkLocalState::open(RuntimeState* state) {
-    SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
-    RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state));
-    auto& p = _parent->cast<NestedLoopJoinBuildSinkOperatorX>();
     _filter_src_expr_ctxs.resize(p._filter_src_expr_ctxs.size());
     for (size_t i = 0; i < _filter_src_expr_ctxs.size(); i++) {
         RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state, 
_filter_src_expr_ctxs[i]));
@@ -55,6 +47,13 @@ Status NestedLoopJoinBuildSinkLocalState::open(RuntimeState* 
state) {
     return Status::OK();
 }
 
+Status NestedLoopJoinBuildSinkLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state));
+    return Status::OK();
+}
+
 Status NestedLoopJoinBuildSinkLocalState::close(RuntimeState* state, Status 
exec_status) {
     RETURN_IF_ERROR(_runtime_filter_slots->process(state, 
_shared_state->build_blocks));
     RETURN_IF_ERROR(JoinBuildSinkLocalState::close(state, exec_status));
diff --git a/be/src/runtime_filter/role/consumer.h 
b/be/src/runtime_filter/role/consumer.h
index 818c0eef684..8b5a490baf1 100644
--- a/be/src/runtime_filter/role/consumer.h
+++ b/be/src/runtime_filter/role/consumer.h
@@ -70,8 +70,7 @@ public:
         case State::APPLIED:
             return "APPLIED";
         default:
-            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, "Invalid 
State {}",
-                                   int(state));
+            throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid State {}", 
int(state));
         }
     }
 
diff --git a/be/src/runtime_filter/role/merger.h 
b/be/src/runtime_filter/role/merger.h
index 3d36353a2b1..627f6760afc 100644
--- a/be/src/runtime_filter/role/merger.h
+++ b/be/src/runtime_filter/role/merger.h
@@ -84,8 +84,7 @@ private:
         case State::WAITING_FOR_PRODUCT:
             return "WAITING_FOR_PRODUCT";
         default:
-            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, "Invalid 
State {}",
-                                   int(state));
+            throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid State {}", 
int(state));
         }
     }
 
diff --git a/be/src/runtime_filter/role/runtime_filter.h 
b/be/src/runtime_filter/role/runtime_filter.h
index ef3f9c913b2..0c9c80c0513 100644
--- a/be/src/runtime_filter/role/runtime_filter.h
+++ b/be/src/runtime_filter/role/runtime_filter.h
@@ -108,9 +108,8 @@ protected:
         try {
             _wrapper->check_state(assumed_states);
         } catch (const Exception& e) {
-            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
-                                   "rf wrapper meet invalid state, {}, {}", 
e.what(),
-                                   debug_string());
+            throw Exception(ErrorCode::INTERNAL_ERROR, "rf wrapper meet 
invalid state, {}, {}",
+                            e.what(), debug_string());
         }
     }
 
diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp 
b/be/src/runtime_filter/runtime_filter_mgr.cpp
index 5ab3f3d6e49..ba222c22b94 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/runtime_filter/runtime_filter_mgr.cpp
@@ -89,7 +89,7 @@ Status RuntimeFilterMgr::register_consumer_filter(
 }
 
 Status RuntimeFilterMgr::register_local_merger_filter(
-        const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions& 
options,
+        const TRuntimeFilterDesc& desc, const TQueryOptions& options,
         std::shared_ptr<RuntimeFilterProducer> producer_filter) {
     DCHECK(_is_global);
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
@@ -169,8 +169,7 @@ Status RuntimeFilterMgr::get_merge_addr(TNetworkAddress* 
addr) {
 
 Status RuntimeFilterMergeControllerEntity::_init_with_desc(
         const TRuntimeFilterDesc* runtime_filter_desc,
-        const std::vector<doris::TRuntimeFilterTargetParamsV2>&& targetv2_info,
-        const int producer_size) {
+        const std::vector<TRuntimeFilterTargetParamsV2>&& targetv2_info, const 
int producer_size) {
     auto filter_id = runtime_filter_desc->filter_id;
     GlobalMergeContext* cnt_val;
     {
@@ -209,7 +208,7 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId 
query_id,
             RETURN_IF_ERROR(_init_with_desc(
                     &filterid_to_desc.second,
                     targetv2_iter == 
runtime_filter_params.rid_to_target_paramv2.end()
-                            ? std::vector<doris::TRuntimeFilterTargetParamsV2> 
{}
+                            ? std::vector<TRuntimeFilterTargetParamsV2> {}
                             : targetv2_iter->second,
                     build_iter->second));
         }
diff --git a/be/src/runtime_filter/runtime_filter_mgr.h 
b/be/src/runtime_filter/runtime_filter_mgr.h
index 573ccb0a4a3..507e6ad085b 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/runtime_filter/runtime_filter_mgr.h
@@ -66,7 +66,7 @@ struct GlobalMergeContext {
     std::mutex mtx;
     std::shared_ptr<RuntimeFilterMerger> merger;
     TRuntimeFilterDesc runtime_filter_desc;
-    std::vector<doris::TRuntimeFilterTargetParamsV2> targetv2_info;
+    std::vector<TRuntimeFilterTargetParamsV2> targetv2_info;
     std::unordered_set<UniqueId> arrive_id;
     std::vector<PNetworkAddress> source_addrs;
 };
@@ -170,7 +170,7 @@ public:
 
 private:
     Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc,
-                           const 
std::vector<doris::TRuntimeFilterTargetParamsV2>&& target_info,
+                           const std::vector<TRuntimeFilterTargetParamsV2>&& 
target_info,
                            const int producer_size);
 
     UniqueId _query_id;
diff --git a/be/src/runtime_filter/runtime_filter_wrapper.h 
b/be/src/runtime_filter/runtime_filter_wrapper.h
index a5a8b6efcec..c51ad4484aa 100644
--- a/be/src/runtime_filter/runtime_filter_wrapper.h
+++ b/be/src/runtime_filter/runtime_filter_wrapper.h
@@ -180,8 +180,7 @@ public:
         case State::READY:
             return "READY";
         default:
-            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, "Invalid 
State {}",
-                                   int(state));
+            throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid State {}", 
int(state));
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to