This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 711b156a78dcebf7c39aede23f335fdd05b62b87
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Wed Jan 31 16:29:52 2024 +0800

    [Refactor][Rf] remove unless code in RF (#30597)
---
 be/src/exprs/bloom_filter_func.h          |  3 +-
 be/src/exprs/runtime_filter.cpp           | 95 +++++++++++--------------------
 be/src/exprs/runtime_filter.h             |  6 +-
 be/src/exprs/runtime_filter_slots.h       | 13 ++---
 be/src/exprs/runtime_filter_slots_cross.h |  2 +-
 be/src/runtime/runtime_filter_mgr.cpp     |  2 +-
 6 files changed, 47 insertions(+), 74 deletions(-)

diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index ed4205a7e0d..84e6eba1e44 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -169,10 +169,9 @@ public:
         return _bloom_filter->init(data, data_size);
     }
 
-    Status get_data(char** data, int* len) {
+    void get_data(char** data, int* len) {
         *data = _bloom_filter->data();
         *len = _bloom_filter->size();
-        return Status::OK();
     }
 
     size_t get_size() const { return _bloom_filter ? _bloom_filter->size() : 
0; }
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index bf1db5ff867..06d1c452fdd 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -67,7 +67,6 @@
 namespace doris {
 
 // PrimitiveType-> PColumnType
-// TODO: use constexpr if we use c++14
 PColumnType to_proto(PrimitiveType type) {
     switch (type) {
     case TYPE_BOOLEAN:
@@ -118,7 +117,6 @@ PColumnType to_proto(PrimitiveType type) {
 }
 
 // PColumnType->PrimitiveType
-// TODO: use constexpr if we use c++14
 PrimitiveType to_primitive_type(PColumnType type) {
     switch (type) {
     case PColumnType::COLUMN_TYPE_BOOL:
@@ -281,12 +279,8 @@ class RuntimePredicateWrapper {
 public:
     RuntimePredicateWrapper(RuntimeFilterParamsContext* state, ObjectPool* 
pool,
                             const RuntimeFilterParams* params)
-            : _state(state),
-              _be_exec_version(_state->be_exec_version),
-              _pool(pool),
-              _column_return_type(params->column_return_type),
-              _filter_type(params->filter_type),
-              _filter_id(params->filter_id) {}
+            : RuntimePredicateWrapper(state, pool, params->column_return_type, 
params->filter_type,
+                                      params->filter_id) {};
     // for a 'tmp' runtime predicate wrapper
     // only could called assign method or as a param for merge
     RuntimePredicateWrapper(RuntimeFilterParamsContext* state, ObjectPool* 
pool,
@@ -350,7 +344,7 @@ public:
             insert_to_bloom_filter(bf);
         }
         // release in filter
-        _context.hybrid_set.reset(create_set(_column_return_type));
+        _context.hybrid_set.reset();
     }
 
     Status init_bloom_filter(const size_t build_bf_cardinality) {
@@ -475,12 +469,12 @@ public:
                 break;
             } else if (wrapper->_is_ignored_in_filter) {
                 VLOG_DEBUG << " ignore merge runtime filter(in filter id " << 
_filter_id
-                           << ") because: " << 
*(wrapper->get_ignored_in_filter_msg());
+                           << ") because: " << 
wrapper->get_ignored_in_filter_msg();
 
                 _is_ignored_in_filter = true;
                 _ignored_in_filter_msg = wrapper->_ignored_in_filter_msg;
                 // release in filter
-                _context.hybrid_set.reset(create_set(_column_return_type));
+                _context.hybrid_set.reset();
                 break;
             }
             // try insert set
@@ -491,14 +485,14 @@ public:
                 msg << " ignore merge runtime filter(in filter id " << 
_filter_id
                     << ") because: in_num(" << _context.hybrid_set->size() << 
") >= max_in_num("
                     << _max_in_num << ")";
-                _ignored_in_filter_msg = _pool->add(new 
std::string(msg.str()));
+                _ignored_in_filter_msg = std::string(msg.str());
 #else
-                _ignored_in_filter_msg = _pool->add(new 
std::string("ignored"));
+                _ignored_in_filter_msg = std::string("ignored");
 #endif
                 _is_ignored_in_filter = true;
 
                 // release in filter
-                _context.hybrid_set.reset(create_set(_column_return_type));
+                _context.hybrid_set.reset();
             }
             break;
         }
@@ -529,7 +523,7 @@ public:
                     CHECK(!wrapper->_is_ignored_in_filter)
                             << " can not ignore merge runtime filter(in filter 
id "
                             << wrapper->_filter_id << ") when used 
IN_OR_BLOOM_FILTER, ignore msg: "
-                            << *(wrapper->get_ignored_in_filter_msg());
+                            << wrapper->get_ignored_in_filter_msg();
                     
_context.hybrid_set->insert(wrapper->_context.hybrid_set.get());
                     if (_max_in_num >= 0 && _context.hybrid_set->size() >= 
_max_in_num) {
                         VLOG_DEBUG << " change runtime filter to bloom 
filter(id=" << _filter_id
@@ -549,7 +543,7 @@ public:
                     CHECK(!wrapper->_is_ignored_in_filter)
                             << " can not ignore merge runtime filter(in filter 
id "
                             << wrapper->_filter_id << ") when used 
IN_OR_BLOOM_FILTER, ignore msg: "
-                            << *(wrapper->get_ignored_in_filter_msg());
+                            << wrapper->get_ignored_in_filter_msg();
                     
wrapper->insert_to_bloom_filter(_context.bloom_filter_func.get());
                     // bloom filter merge bloom filter
                 } else {
@@ -566,14 +560,15 @@ public:
     }
 
     Status assign(const PInFilter* in_filter) {
-        PrimitiveType type = to_primitive_type(in_filter->column_type());
         if (in_filter->has_ignored_msg()) {
             VLOG_DEBUG << "Ignore in filter(id=" << _filter_id
                        << ") because: " << in_filter->ignored_msg();
             _is_ignored_in_filter = true;
-            _ignored_in_filter_msg = _pool->add(new 
std::string(in_filter->ignored_msg()));
+            _ignored_in_filter_msg = in_filter->ignored_msg();
             return Status::OK();
         }
+
+        PrimitiveType type = to_primitive_type(in_filter->column_type());
         _context.hybrid_set.reset(create_set(type));
         switch (type) {
         case TYPE_BOOLEAN: {
@@ -883,19 +878,15 @@ public:
         return Status::InvalidArgument("not support!");
     }
 
-    Status get_in_filter_iterator(HybridSetBase::IteratorBase** it) {
-        *it = _context.hybrid_set->begin();
-        return Status::OK();
-    }
+    HybridSetBase::IteratorBase* get_in_filter_iterator() { return 
_context.hybrid_set->begin(); }
 
-    Status get_bloom_filter_desc(char** data, int* filter_length) {
-        return _context.bloom_filter_func->get_data(data, filter_length);
+    void get_bloom_filter_desc(char** data, int* filter_length) {
+        _context.bloom_filter_func->get_data(data, filter_length);
     }
 
-    Status get_minmax_filter_desc(void** min_data, void** max_data) {
+    void get_minmax_filter_desc(void** min_data, void** max_data) {
         *min_data = _context.minmax_func->get_min();
         *max_data = _context.minmax_func->get_max();
-        return Status::OK();
     }
 
     PrimitiveType column_type() { return _column_return_type; }
@@ -904,7 +895,7 @@ public:
 
     bool is_ignored_in_filter() const { return _is_ignored_in_filter; }
 
-    std::string* get_ignored_in_filter_msg() const { return 
_ignored_in_filter_msg; }
+    const std::string& get_ignored_in_filter_msg() const { return 
_ignored_in_filter_msg; }
 
     void batch_assign(const PInFilter* filter,
                       void (*assign_func)(std::shared_ptr<HybridSetBase>& 
_hybrid_set,
@@ -948,7 +939,7 @@ private:
     vectorized::SharedRuntimeFilterContext _context;
     bool _is_bloomfilter = false;
     bool _is_ignored_in_filter = false;
-    std::string* _ignored_in_filter_msg = nullptr;
+    std::string _ignored_in_filter_msg;
     uint32_t _filter_id;
 };
 
@@ -962,13 +953,8 @@ Status IRuntimeFilter::create(RuntimeFilterParamsContext* 
state, ObjectPool* poo
                                   is_global ? false : build_bf_exactly);
 }
 
-void 
IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext& 
context) {
-    context = _wrapper->_context;
-}
-
-Status 
IRuntimeFilter::copy_from_shared_context(vectorized::SharedRuntimeFilterContext&
 context) {
-    _wrapper->_context = context;
-    return Status::OK();
+vectorized::SharedRuntimeFilterContext& 
IRuntimeFilter::get_shared_context_ref() {
+    return _wrapper->_context;
 }
 
 void IRuntimeFilter::copy_from_other(IRuntimeFilter* other) {
@@ -1170,7 +1156,7 @@ void IRuntimeFilter::set_ignored(const std::string& msg) {
     _is_ignored = true;
     if (_wrapper->_filter_type == RuntimeFilterType::IN_FILTER) {
         _wrapper->_is_ignored_in_filter = true;
-        _wrapper->_ignored_in_filter_msg = _pool->add(new std::string(msg));
+        _wrapper->_ignored_in_filter_msg = msg;
     }
 }
 
@@ -1363,14 +1349,12 @@ void 
IRuntimeFilter::update_runtime_filter_type_to_profile() {
 
 Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
     if (!_is_ignored && wrapper->is_ignored_in_filter()) {
-        std::string* msg = wrapper->get_ignored_in_filter_msg();
-        set_ignored(msg ? *msg : "");
+        set_ignored(wrapper->get_ignored_in_filter_msg());
     }
     auto origin_type = _wrapper->get_real_type();
     Status status = _wrapper->merge(wrapper);
     if (!_is_ignored && _wrapper->is_ignored_in_filter()) {
-        std::string* msg = _wrapper->get_ignored_in_filter_msg();
-        set_ignored(msg ? *msg : "");
+        set_ignored(_wrapper->get_ignored_in_filter_msg());
     }
     if (origin_type != _wrapper->get_real_type()) {
         update_runtime_filter_type_to_profile();
@@ -1403,7 +1387,7 @@ Status IRuntimeFilter::serialize_impl(T* request, void** 
data, int* len) {
         auto in_filter = request->mutable_in_filter();
         to_protobuf(in_filter);
     } else if (real_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER) {
-        RETURN_IF_ERROR(_wrapper->get_bloom_filter_desc((char**)data, len));
+        _wrapper->get_bloom_filter_desc((char**)data, len);
         DCHECK(data != nullptr);
         request->mutable_bloom_filter()->set_filter_length(*len);
         request->mutable_bloom_filter()->set_always_true(false);
@@ -1427,8 +1411,7 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) {
         return;
     }
 
-    HybridSetBase::IteratorBase* it;
-    static_cast<void>(_wrapper->get_in_filter_iterator(&it));
+    auto it = _wrapper->get_in_filter_iterator();
     DCHECK(it != nullptr);
 
     switch (column_type) {
@@ -1554,7 +1537,7 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) {
 void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) {
     void* min_data = nullptr;
     void* max_data = nullptr;
-    static_cast<void>(_wrapper->get_minmax_filter_desc(&min_data, &max_data));
+    _wrapper->get_minmax_filter_desc(&min_data, &max_data);
     DCHECK(min_data != nullptr && max_data != nullptr);
     filter->set_column_type(to_proto(_wrapper->column_type()));
 
@@ -1673,7 +1656,8 @@ bool IRuntimeFilter::is_bloomfilter() {
     return _wrapper->is_bloomfilter();
 }
 
-Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
+template <typename T>
+Status IRuntimeFilter::_update_filter(const T* param) {
     if (param->request->has_in_filter() && 
param->request->in_filter().has_ignored_msg()) {
         const PInFilter in_filter = param->request->in_filter();
         set_ignored(in_filter.ignored_msg());
@@ -1691,26 +1675,15 @@ Status IRuntimeFilter::update_filter(const 
UpdateRuntimeFilterParams* param) {
     return Status::OK();
 }
 
+Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
+    return _update_filter(param);
+}
+
 Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param,
                                      int64_t start_apply) {
-    if (param->request->has_in_filter() && 
param->request->in_filter().has_ignored_msg()) {
-        const PInFilter in_filter = param->request->in_filter();
-        set_ignored(in_filter.ignored_msg());
-    }
-
-    std::unique_ptr<RuntimePredicateWrapper> tmp_wrapper;
-    RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, param, _pool, 
&tmp_wrapper));
-    auto origin_type = _wrapper->get_real_type();
-    RETURN_IF_ERROR(_wrapper->merge(tmp_wrapper.get()));
-    if (origin_type != _wrapper->get_real_type()) {
-        update_runtime_filter_type_to_profile();
-    }
-    this->signal();
-
-    _profile->add_info_string("MergeTime", 
std::to_string(param->request->merge_time()) + " ms");
     _profile->add_info_string("UpdateTime",
                               std::to_string(MonotonicMillis() - start_apply) 
+ " ms");
-    return Status::OK();
+    return _update_filter(param);
 }
 
 Status 
RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContextSPtr>&
 probe_ctxs,
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index fc324c1c1be..bc487bfe9c9 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -218,8 +218,7 @@ public:
                          bool build_bf_exactly = false, bool is_global = false,
                          int parallel_tasks = 0);
 
-    void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& 
context);
-    Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& 
context);
+    vectorized::SharedRuntimeFilterContext& get_shared_context_ref();
 
     void copy_from_other(IRuntimeFilter* other);
 
@@ -367,6 +366,9 @@ protected:
     void to_protobuf(PInFilter* filter);
     void to_protobuf(PMinMaxFilter* filter);
 
+    template <class T>
+    Status _update_filter(const T* param);
+
     template <class T>
     Status serialize_impl(T* request, void** data, int* len);
 
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index 4859734a6a4..8f5dab22f8c 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -91,6 +91,10 @@ public:
         std::sort(sorted_runtime_filter_descs.begin(), 
sorted_runtime_filter_descs.end(),
                   compare_desc);
 
+        // do not create 'in filter' when hash_table size over limit
+        const auto max_in_num = state->runtime_filter_max_in_num();
+        const bool over_max_in_num = (hash_table_size >= max_in_num);
+
         for (auto& filter_desc : sorted_runtime_filter_descs) {
             IRuntimeFilter* runtime_filter = nullptr;
             
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id,
@@ -103,10 +107,6 @@ public:
                         runtime_filter->expr_order(), 
_build_expr_context.size());
             }
 
-            // do not create 'in filter' when hash_table size over limit
-            auto max_in_num = state->runtime_filter_max_in_num();
-            bool over_max_in_num = (hash_table_size >= max_in_num);
-
             bool is_in_filter = (runtime_filter->type() == 
RuntimeFilterType::IN_FILTER);
 
             if (over_max_in_num &&
@@ -213,8 +213,7 @@ public:
     void copy_to_shared_context(vectorized::SharedHashTableContextPtr& 
context) {
         for (auto& it : _runtime_filters) {
             for (auto& filter : it.second) {
-                auto& target = context->runtime_filters[filter->filter_id()];
-                filter->copy_to_shared_context(target);
+                context->runtime_filters[filter->filter_id()] = 
filter->get_shared_context_ref();
             }
         }
     }
@@ -227,7 +226,7 @@ public:
                 if (ret == context->runtime_filters.end()) {
                     return Status::Aborted("invalid runtime filter id: {}", 
filter_id);
                 }
-                RETURN_IF_ERROR(filter->copy_from_shared_context(ret->second));
+                filter->get_shared_context_ref() = ret->second;
             }
         }
         return Status::OK();
diff --git a/be/src/exprs/runtime_filter_slots_cross.h 
b/be/src/exprs/runtime_filter_slots_cross.h
index 76b6085bab9..7b1a2063d15 100644
--- a/be/src/exprs/runtime_filter_slots_cross.h
+++ b/be/src/exprs/runtime_filter_slots_cross.h
@@ -82,7 +82,7 @@ public:
         return Status::OK();
     }
 
-    bool empty() { return _runtime_filters.empty(); }
+    bool empty() const { return _runtime_filters.empty(); }
 
 private:
     const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 29bf22535ed..3a01368b583 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -116,7 +116,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const 
TRuntimeFilterDesc& desc
         }
     }
 
-    // TODO: make the two case as one case to judge
+    // TODO: union the remote opt and global two case as one case to one judge
     bool remote_opt_or_global =
             (desc.__isset.opt_remote_rf && desc.opt_remote_rf && 
desc.has_remote_targets &&
              desc.type == TRuntimeFilterType::BLOOM) ||


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to