This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 7fca026471 [Bug](runtime-filter) fix probe expr prepared twice on 
minmax runtime filter (#22229) (#22337)
7fca026471 is described below

commit 7fca0264717a19718f7fbb33f8bf0566698c207a
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Fri Jul 28 17:34:45 2023 +0800

    [Bug](runtime-filter) fix probe expr prepared twice on minmax runtime 
filter (#22229) (#22337)
---
 be/src/exprs/runtime_filter.cpp                    | 74 +++++++++++-----------
 be/src/exprs/runtime_filter.h                      |  5 +-
 be/src/vec/exec/runtime_filter_consumer.cpp        |  6 +-
 be/src/vec/exec/runtime_filter_consumer.h          |  1 +
 .../query_p0/sql_functions/test_in_expr.groovy     | 12 ++--
 5 files changed, 51 insertions(+), 47 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 27f5ef843a..0efb9d6bfa 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -609,8 +609,8 @@ public:
         return 0;
     }
 
-    Status get_push_exprs(std::vector<vectorized::VExprSPtr>* container,
-                          const vectorized::VExprContextSPtr& prob_expr);
+    Status get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs,
+                          std::vector<vectorized::VExprSPtr>& push_exprs, 
const TExpr& probe_expr);
 
     Status merge(const RuntimePredicateWrapper* wrapper) {
         bool can_not_merge_in_or_bloom = _filter_type == 
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
@@ -1152,17 +1152,18 @@ Status IRuntimeFilter::publish() {
     }
 }
 
-Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* 
push_exprs,
+Status 
IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>& 
probe_ctxs,
+                                          std::vector<vectorized::VExprSPtr>& 
push_exprs,
                                           bool is_late_arrival) {
     DCHECK(is_consumer());
-    _profile->add_info_string("Info", _format_status());
     if (_is_ignored) {
         return Status::OK();
     }
     if (!is_late_arrival) {
         _set_push_down();
     }
-    return _wrapper->get_push_exprs(push_exprs, _vprobe_ctx);
+    _profile->add_info_string("Info", _format_status());
+    return _wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr);
 }
 
 bool IRuntimeFilter::await() {
@@ -1357,10 +1358,9 @@ Status IRuntimeFilter::init_with_desc(const 
TRuntimeFilterDesc* desc, const TQue
         DCHECK(is_consumer());
         const auto iter = desc->planId_to_target_expr.find(node_id);
         if (iter == desc->planId_to_target_expr.end()) {
-            DCHECK(false) << "runtime filter not found node_id:" << node_id;
-            return Status::InternalError("not found a node id");
+            return Status::InternalError("not found a node id:{}", node_id);
         }
-        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(iter->second, 
_vprobe_ctx));
+        _probe_expr = iter->second;
     }
 
     if (_state) {
@@ -1839,15 +1839,18 @@ Status IRuntimeFilter::update_filter(const 
UpdateRuntimeFilterParamsV2* param,
     return Status::OK();
 }
 
-Status 
RuntimePredicateWrapper::get_push_exprs(std::vector<vectorized::VExprSPtr>* 
container,
-                                               const 
vectorized::VExprContextSPtr& prob_expr) {
-    DCHECK(container != nullptr);
-    DCHECK(_pool != nullptr);
-    DCHECK(prob_expr->root()->type().type == _column_return_type ||
-           (is_string_type(prob_expr->root()->type().type) &&
+Status 
RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContextSPtr>&
 probe_ctxs,
+                                               
std::vector<vectorized::VExprSPtr>& container,
+                                               const TExpr& probe_expr) {
+    vectorized::VExprContextSPtr probe_ctx;
+    RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, 
probe_ctx));
+    probe_ctxs.push_back(probe_ctx);
+
+    DCHECK(probe_ctx->root()->type().type == _column_return_type ||
+           (is_string_type(probe_ctx->root()->type().type) &&
             is_string_type(_column_return_type)) ||
            _filter_type == RuntimeFilterType::BITMAP_FILTER)
-            << " prob_expr->root()->type().type: " << 
prob_expr->root()->type().type
+            << " prob_expr->root()->type().type: " << 
probe_ctx->root()->type().type
             << " _column_return_type: " << _column_return_type
             << " _filter_type: " << ::doris::to_string(_filter_type);
 
@@ -1868,10 +1871,9 @@ Status 
RuntimePredicateWrapper::get_push_exprs(std::vector<vectorized::VExprSPtr
 
             auto in_pred = vectorized::VDirectInPredicate::create_shared(node);
             in_pred->set_filter(_context.hybrid_set);
-            auto cloned_expr = prob_expr->root()->clone();
-            in_pred->add_child(cloned_expr);
+            in_pred->add_child(probe_ctx->root());
             auto wrapper = 
vectorized::VRuntimeFilterWrapper::create_shared(node, in_pred);
-            container->push_back(wrapper);
+            container.push_back(wrapper);
         }
         break;
     }
@@ -1879,29 +1881,31 @@ Status 
RuntimePredicateWrapper::get_push_exprs(std::vector<vectorized::VExprSPtr
         vectorized::VExprSPtr max_pred;
         // create max filter
         TExprNode max_pred_node;
-        RETURN_IF_ERROR(create_vbin_predicate(prob_expr->root()->type(), 
TExprOpcode::LE, max_pred,
+        RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), 
TExprOpcode::LE, max_pred,
                                               &max_pred_node));
         vectorized::VExprSPtr max_literal;
-        RETURN_IF_ERROR(create_literal(prob_expr->root()->type(), 
_context.minmax_func->get_max(),
+        RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), 
_context.minmax_func->get_max(),
                                        max_literal));
-        auto cloned_expr = prob_expr->root()->clone();
-        max_pred->add_child(cloned_expr);
+        max_pred->add_child(probe_ctx->root());
         max_pred->add_child(max_literal);
-        container->push_back(
+        container.push_back(
                 
vectorized::VRuntimeFilterWrapper::create_shared(max_pred_node, max_pred));
 
+        vectorized::VExprContextSPtr new_probe_ctx;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, 
new_probe_ctx));
+        probe_ctxs.push_back(new_probe_ctx);
+
         // create min filter
         vectorized::VExprSPtr min_pred;
         TExprNode min_pred_node;
-        RETURN_IF_ERROR(create_vbin_predicate(prob_expr->root()->type(), 
TExprOpcode::GE, min_pred,
-                                              &min_pred_node));
+        RETURN_IF_ERROR(create_vbin_predicate(new_probe_ctx->root()->type(), 
TExprOpcode::GE,
+                                              min_pred, &min_pred_node));
         vectorized::VExprSPtr min_literal;
-        RETURN_IF_ERROR(create_literal(prob_expr->root()->type(), 
_context.minmax_func->get_min(),
-                                       min_literal));
-        cloned_expr = prob_expr->root()->clone();
-        min_pred->add_child(cloned_expr);
+        RETURN_IF_ERROR(create_literal(new_probe_ctx->root()->type(),
+                                       _context.minmax_func->get_min(), 
min_literal));
+        min_pred->add_child(new_probe_ctx->root());
         min_pred->add_child(min_literal);
-        container->push_back(
+        container.push_back(
                 
vectorized::VRuntimeFilterWrapper::create_shared(min_pred_node, min_pred));
         break;
     }
@@ -1918,10 +1922,9 @@ Status 
RuntimePredicateWrapper::get_push_exprs(std::vector<vectorized::VExprSPtr
         node.__set_is_nullable(false);
         auto bloom_pred = vectorized::VBloomPredicate::create_shared(node);
         bloom_pred->set_filter(_context.bloom_filter_func);
-        auto cloned_expr = prob_expr->root()->clone();
-        bloom_pred->add_child(cloned_expr);
+        bloom_pred->add_child(probe_ctx->root());
         auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, 
bloom_pred);
-        container->push_back(wrapper);
+        container.push_back(wrapper);
         break;
     }
     case RuntimeFilterType::BITMAP_FILTER: {
@@ -1937,10 +1940,9 @@ Status 
RuntimePredicateWrapper::get_push_exprs(std::vector<vectorized::VExprSPtr
         node.__set_is_nullable(false);
         auto bitmap_pred = vectorized::VBitmapPredicate::create_shared(node);
         bitmap_pred->set_filter(_context.bitmap_filter_func);
-        auto cloned_expr = prob_expr->root()->clone();
-        bitmap_pred->add_child(cloned_expr);
+        bitmap_pred->add_child(probe_ctx->root());
         auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, 
bitmap_pred);
-        container->push_back(wrapper);
+        container.push_back(wrapper);
         break;
     }
     default:
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index fb5e43d177..2c23af6251 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -221,7 +221,8 @@ public:
 
     RuntimeFilterType type() const { return _runtime_filter_type; }
 
-    Status get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs, 
bool is_late_arrival);
+    Status get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>& 
probe_ctxs,
+                              std::vector<vectorized::VExprSPtr>& push_exprs, 
bool is_late_arrival);
 
     bool is_broadcast_join() const { return _is_broadcast_join; }
 
@@ -376,7 +377,7 @@ protected:
     // this filter won't filter any data
     bool _always_true;
 
-    doris::vectorized::VExprContextSPtr _vprobe_ctx;
+    TExpr _probe_expr;
 
     // Indicate whether runtime filter expr has been ignored
     bool _is_ignored;
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp 
b/be/src/vec/exec/runtime_filter_consumer.cpp
index 2af841749b..c3bc8c0e22 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -95,7 +95,7 @@ Status RuntimeFilterConsumer::_acquire_runtime_filter() {
             ready = runtime_filter->await();
         }
         if (ready && !_runtime_filter_ctxs[i].apply_mark) {
-            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs, 
false));
+            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, 
vexprs, false));
             _runtime_filter_ctxs[i].apply_mark = true;
         } else if (runtime_filter->current_state() == 
RuntimeFilterState::NOT_READY &&
                    !_runtime_filter_ctxs[i].apply_mark) {
@@ -151,8 +151,8 @@ Status 
RuntimeFilterConsumer::try_append_late_arrival_runtime_filter(int* arrive
             ++current_arrived_rf_num;
             continue;
         } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) {
-            RETURN_IF_ERROR(
-                    
_runtime_filter_ctxs[i].runtime_filter->get_push_expr_ctxs(&exprs, true));
+            
RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_push_expr_ctxs(
+                    _probe_ctxs, exprs, true));
             ++current_arrived_rf_num;
             _runtime_filter_ctxs[i].apply_mark = true;
         }
diff --git a/be/src/vec/exec/runtime_filter_consumer.h 
b/be/src/vec/exec/runtime_filter_consumer.h
index 4e4d53e818..a6527fae62 100644
--- a/be/src/vec/exec/runtime_filter_consumer.h
+++ b/be/src/vec/exec/runtime_filter_consumer.h
@@ -70,6 +70,7 @@ private:
     int32_t _filter_id;
 
     std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+    std::list<vectorized::VExprContextSPtr> _probe_ctxs;
 
     const RowDescriptor& _row_descriptor_ref;
 
diff --git a/regression-test/suites/query_p0/sql_functions/test_in_expr.groovy 
b/regression-test/suites/query_p0/sql_functions/test_in_expr.groovy
index cdaaa64126..d04a9471ac 100644
--- a/regression-test/suites/query_p0/sql_functions/test_in_expr.groovy
+++ b/regression-test/suites/query_p0/sql_functions/test_in_expr.groovy
@@ -147,7 +147,7 @@ suite("test_in_expr", "query") {
 
     test {
         sql """ select c_array, c_array in (null) from array_in_test; """
-        exception "NOT_IMPLEMENTED_ERROR"
+        exception "errCode"
     }
 
     sql " drop table if exists `json_in_test` "
@@ -165,7 +165,7 @@ suite("test_in_expr", "query") {
 
     test {
         sql """ select j, j in (null) from json_in_test; """
-        exception "NOT_IMPLEMENTED_ERROR"
+        exception "errCode"
     }
 
     sql " drop table if exists `bitmap_in_test` "
@@ -185,11 +185,11 @@ suite("test_in_expr", "query") {
     sql """ insert into bitmap_in_test values (20200622, 1, to_bitmap(243));"""
     test {
         sql """ select device_id, device_id in (to_bitmap(1)) from 
bitmap_in_test; """
-        exception "NOT_IMPLEMENTED_ERROR"
+        exception "errCode"
     }
     test {
         sql """ select device_id, device_id in 
(to_bitmap(1),to_bitmap(2),to_bitmap(243)) from bitmap_in_test; """
-        exception "NOT_IMPLEMENTED_ERROR"
+        exception "errCode"
     }
 
     sql " drop table if exists `hll_in_test` "
@@ -208,11 +208,11 @@ suite("test_in_expr", "query") {
     sql """ insert into hll_in_test values(1, hll_hash(1)) """
     test {
         sql """ select id, pv in (hll_hash(1)) from hll_in_test; """
-        exception "NOT_IMPLEMENTED_ERROR"
+        exception "errCode"
     }
     test {
         sql """ select id, pv in (hll_hash(1), hll_hash(2)) from hll_in_test; 
"""
-        exception "NOT_IMPLEMENTED_ERROR"
+        exception "errCode"
     }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to