This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 7fca026471 [Bug](runtime-filter) fix probe expr prepared twice on minmax runtime filter (#22229) (#22337) 7fca026471 is described below commit 7fca0264717a19718f7fbb33f8bf0566698c207a Author: HappenLee <happen...@hotmail.com> AuthorDate: Fri Jul 28 17:34:45 2023 +0800 [Bug](runtime-filter) fix probe expr prepared twice on minmax runtime filter (#22229) (#22337) --- be/src/exprs/runtime_filter.cpp | 74 +++++++++++----------- be/src/exprs/runtime_filter.h | 5 +- be/src/vec/exec/runtime_filter_consumer.cpp | 6 +- be/src/vec/exec/runtime_filter_consumer.h | 1 + .../query_p0/sql_functions/test_in_expr.groovy | 12 ++-- 5 files changed, 51 insertions(+), 47 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 27f5ef843a..0efb9d6bfa 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -609,8 +609,8 @@ public: return 0; } - Status get_push_exprs(std::vector<vectorized::VExprSPtr>* container, - const vectorized::VExprContextSPtr& prob_expr); + Status get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs, + std::vector<vectorized::VExprSPtr>& push_exprs, const TExpr& probe_expr); Status merge(const RuntimePredicateWrapper* wrapper) { bool can_not_merge_in_or_bloom = _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER && @@ -1152,17 +1152,18 @@ Status IRuntimeFilter::publish() { } } -Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs, +Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>& probe_ctxs, + std::vector<vectorized::VExprSPtr>& push_exprs, bool is_late_arrival) { DCHECK(is_consumer()); - _profile->add_info_string("Info", _format_status()); if (_is_ignored) { return Status::OK(); } if (!is_late_arrival) { _set_push_down(); } - return _wrapper->get_push_exprs(push_exprs, _vprobe_ctx); + _profile->add_info_string("Info", _format_status()); + return _wrapper->get_push_exprs(probe_ctxs, push_exprs, _probe_expr); } bool IRuntimeFilter::await() { @@ -1357,10 +1358,9 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue DCHECK(is_consumer()); const auto iter = desc->planId_to_target_expr.find(node_id); if (iter == desc->planId_to_target_expr.end()) { - DCHECK(false) << "runtime filter not found node_id:" << node_id; - return Status::InternalError("not found a node id"); + return Status::InternalError("not found a node id:{}", node_id); } - RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(iter->second, _vprobe_ctx)); + _probe_expr = iter->second; } if (_state) { @@ -1839,15 +1839,18 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param, return Status::OK(); } -Status RuntimePredicateWrapper::get_push_exprs(std::vector<vectorized::VExprSPtr>* container, - const vectorized::VExprContextSPtr& prob_expr) { - DCHECK(container != nullptr); - DCHECK(_pool != nullptr); - DCHECK(prob_expr->root()->type().type == _column_return_type || - (is_string_type(prob_expr->root()->type().type) && +Status RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs, + std::vector<vectorized::VExprSPtr>& container, + const TExpr& probe_expr) { + vectorized::VExprContextSPtr probe_ctx; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, probe_ctx)); + probe_ctxs.push_back(probe_ctx); + + DCHECK(probe_ctx->root()->type().type == _column_return_type || + (is_string_type(probe_ctx->root()->type().type) && is_string_type(_column_return_type)) || _filter_type == RuntimeFilterType::BITMAP_FILTER) - << " prob_expr->root()->type().type: " << prob_expr->root()->type().type + << " prob_expr->root()->type().type: " << probe_ctx->root()->type().type << " _column_return_type: " << _column_return_type << " _filter_type: " << ::doris::to_string(_filter_type); @@ -1868,10 +1871,9 @@ Status RuntimePredicateWrapper::get_push_exprs(std::vector<vectorized::VExprSPtr auto in_pred = vectorized::VDirectInPredicate::create_shared(node); in_pred->set_filter(_context.hybrid_set); - auto cloned_expr = prob_expr->root()->clone(); - in_pred->add_child(cloned_expr); + in_pred->add_child(probe_ctx->root()); auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, in_pred); - container->push_back(wrapper); + container.push_back(wrapper); } break; } @@ -1879,29 +1881,31 @@ Status RuntimePredicateWrapper::get_push_exprs(std::vector<vectorized::VExprSPtr vectorized::VExprSPtr max_pred; // create max filter TExprNode max_pred_node; - RETURN_IF_ERROR(create_vbin_predicate(prob_expr->root()->type(), TExprOpcode::LE, max_pred, + RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), TExprOpcode::LE, max_pred, &max_pred_node)); vectorized::VExprSPtr max_literal; - RETURN_IF_ERROR(create_literal(prob_expr->root()->type(), _context.minmax_func->get_max(), + RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), _context.minmax_func->get_max(), max_literal)); - auto cloned_expr = prob_expr->root()->clone(); - max_pred->add_child(cloned_expr); + max_pred->add_child(probe_ctx->root()); max_pred->add_child(max_literal); - container->push_back( + container.push_back( vectorized::VRuntimeFilterWrapper::create_shared(max_pred_node, max_pred)); + vectorized::VExprContextSPtr new_probe_ctx; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, new_probe_ctx)); + probe_ctxs.push_back(new_probe_ctx); + // create min filter vectorized::VExprSPtr min_pred; TExprNode min_pred_node; - RETURN_IF_ERROR(create_vbin_predicate(prob_expr->root()->type(), TExprOpcode::GE, min_pred, - &min_pred_node)); + RETURN_IF_ERROR(create_vbin_predicate(new_probe_ctx->root()->type(), TExprOpcode::GE, + min_pred, &min_pred_node)); vectorized::VExprSPtr min_literal; - RETURN_IF_ERROR(create_literal(prob_expr->root()->type(), _context.minmax_func->get_min(), - min_literal)); - cloned_expr = prob_expr->root()->clone(); - min_pred->add_child(cloned_expr); + RETURN_IF_ERROR(create_literal(new_probe_ctx->root()->type(), + _context.minmax_func->get_min(), min_literal)); + min_pred->add_child(new_probe_ctx->root()); min_pred->add_child(min_literal); - container->push_back( + container.push_back( vectorized::VRuntimeFilterWrapper::create_shared(min_pred_node, min_pred)); break; } @@ -1918,10 +1922,9 @@ Status RuntimePredicateWrapper::get_push_exprs(std::vector<vectorized::VExprSPtr node.__set_is_nullable(false); auto bloom_pred = vectorized::VBloomPredicate::create_shared(node); bloom_pred->set_filter(_context.bloom_filter_func); - auto cloned_expr = prob_expr->root()->clone(); - bloom_pred->add_child(cloned_expr); + bloom_pred->add_child(probe_ctx->root()); auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, bloom_pred); - container->push_back(wrapper); + container.push_back(wrapper); break; } case RuntimeFilterType::BITMAP_FILTER: { @@ -1937,10 +1940,9 @@ Status RuntimePredicateWrapper::get_push_exprs(std::vector<vectorized::VExprSPtr node.__set_is_nullable(false); auto bitmap_pred = vectorized::VBitmapPredicate::create_shared(node); bitmap_pred->set_filter(_context.bitmap_filter_func); - auto cloned_expr = prob_expr->root()->clone(); - bitmap_pred->add_child(cloned_expr); + bitmap_pred->add_child(probe_ctx->root()); auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, bitmap_pred); - container->push_back(wrapper); + container.push_back(wrapper); break; } default: diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index fb5e43d177..2c23af6251 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -221,7 +221,8 @@ public: RuntimeFilterType type() const { return _runtime_filter_type; } - Status get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs, bool is_late_arrival); + Status get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>& probe_ctxs, + std::vector<vectorized::VExprSPtr>& push_exprs, bool is_late_arrival); bool is_broadcast_join() const { return _is_broadcast_join; } @@ -376,7 +377,7 @@ protected: // this filter won't filter any data bool _always_true; - doris::vectorized::VExprContextSPtr _vprobe_ctx; + TExpr _probe_expr; // Indicate whether runtime filter expr has been ignored bool _is_ignored; diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index 2af841749b..c3bc8c0e22 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -95,7 +95,7 @@ Status RuntimeFilterConsumer::_acquire_runtime_filter() { ready = runtime_filter->await(); } if (ready && !_runtime_filter_ctxs[i].apply_mark) { - RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs, false)); + RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false)); _runtime_filter_ctxs[i].apply_mark = true; } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY && !_runtime_filter_ctxs[i].apply_mark) { @@ -151,8 +151,8 @@ Status RuntimeFilterConsumer::try_append_late_arrival_runtime_filter(int* arrive ++current_arrived_rf_num; continue; } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) { - RETURN_IF_ERROR( - _runtime_filter_ctxs[i].runtime_filter->get_push_expr_ctxs(&exprs, true)); + RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_push_expr_ctxs( + _probe_ctxs, exprs, true)); ++current_arrived_rf_num; _runtime_filter_ctxs[i].apply_mark = true; } diff --git a/be/src/vec/exec/runtime_filter_consumer.h b/be/src/vec/exec/runtime_filter_consumer.h index 4e4d53e818..a6527fae62 100644 --- a/be/src/vec/exec/runtime_filter_consumer.h +++ b/be/src/vec/exec/runtime_filter_consumer.h @@ -70,6 +70,7 @@ private: int32_t _filter_id; std::vector<TRuntimeFilterDesc> _runtime_filter_descs; + std::list<vectorized::VExprContextSPtr> _probe_ctxs; const RowDescriptor& _row_descriptor_ref; diff --git a/regression-test/suites/query_p0/sql_functions/test_in_expr.groovy b/regression-test/suites/query_p0/sql_functions/test_in_expr.groovy index cdaaa64126..d04a9471ac 100644 --- a/regression-test/suites/query_p0/sql_functions/test_in_expr.groovy +++ b/regression-test/suites/query_p0/sql_functions/test_in_expr.groovy @@ -147,7 +147,7 @@ suite("test_in_expr", "query") { test { sql """ select c_array, c_array in (null) from array_in_test; """ - exception "NOT_IMPLEMENTED_ERROR" + exception "errCode" } sql " drop table if exists `json_in_test` " @@ -165,7 +165,7 @@ suite("test_in_expr", "query") { test { sql """ select j, j in (null) from json_in_test; """ - exception "NOT_IMPLEMENTED_ERROR" + exception "errCode" } sql " drop table if exists `bitmap_in_test` " @@ -185,11 +185,11 @@ suite("test_in_expr", "query") { sql """ insert into bitmap_in_test values (20200622, 1, to_bitmap(243));""" test { sql """ select device_id, device_id in (to_bitmap(1)) from bitmap_in_test; """ - exception "NOT_IMPLEMENTED_ERROR" + exception "errCode" } test { sql """ select device_id, device_id in (to_bitmap(1),to_bitmap(2),to_bitmap(243)) from bitmap_in_test; """ - exception "NOT_IMPLEMENTED_ERROR" + exception "errCode" } sql " drop table if exists `hll_in_test` " @@ -208,11 +208,11 @@ suite("test_in_expr", "query") { sql """ insert into hll_in_test values(1, hll_hash(1)) """ test { sql """ select id, pv in (hll_hash(1)) from hll_in_test; """ - exception "NOT_IMPLEMENTED_ERROR" + exception "errCode" } test { sql """ select id, pv in (hll_hash(1), hll_hash(2)) from hll_in_test; """ - exception "NOT_IMPLEMENTED_ERROR" + exception "errCode" } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org