This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 31e40191a8 [Refactor] add vpre_filter_expr for vectorized to improve performance (#9508) 31e40191a8 is described below commit 31e40191a8ba7e5295258e9be61e91c8ffb2fa33 Author: xiepengcheng01 <100340096+xiepengchen...@users.noreply.github.com> AuthorDate: Sun May 22 11:45:57 2022 +0800 [Refactor] add vpre_filter_expr for vectorized to improve performance (#9508) --- be/src/exec/base_scanner.cpp | 28 +++--- be/src/exec/base_scanner.h | 4 +- be/test/vec/exec/vbroker_scanner_test.cpp | 103 ++++++++++++++++++++- .../org/apache/doris/planner/LoadScanNode.java | 9 +- .../java/org/apache/doris/planner/PlanNode.java | 7 ++ gensrc/thrift/PlanNodes.thrift | 2 +- 6 files changed, 129 insertions(+), 24 deletions(-) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index e06b52de2f..005e64c703 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -130,11 +130,13 @@ Status BaseScanner::init_expr_ctxes() { // preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor if (!_pre_filter_texprs.empty()) { if (_state->enable_vectorized_exec()) { - RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees( - _state->obj_pool(), _pre_filter_texprs, &_vpre_filter_ctxs)); - RETURN_IF_ERROR(vectorized::VExpr::prepare(_vpre_filter_ctxs, _state, *_row_desc, - _mem_tracker)); - RETURN_IF_ERROR(vectorized::VExpr::open(_vpre_filter_ctxs, _state)); + // for vectorized, preceding filter exprs should be compounded to one passed from fe. + DCHECK(_pre_filter_texprs.size() == 1); + _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*); + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree( + _state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get())); + RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc, _mem_tracker)); + RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state)); } else { RETURN_IF_ERROR(Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs, &_pre_filter_ctxs)); @@ -302,14 +304,10 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) { Status BaseScanner::_filter_src_block() { auto origin_column_num = _src_block.columns(); // filter block - if (!_vpre_filter_ctxs.empty()) { - for (auto _vpre_filter_ctx : _vpre_filter_ctxs) { - auto old_rows = _src_block.rows(); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx, &_src_block, - origin_column_num)); - _counter->num_rows_unselected += old_rows - _src_block.rows(); - } - } + auto old_rows = _src_block.rows(); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx_ptr, &_src_block, + origin_column_num)); + _counter->num_rows_unselected += old_rows - _src_block.rows(); return Status::OK(); } @@ -453,8 +451,8 @@ void BaseScanner::close() { Expr::close(_pre_filter_ctxs, _state); } - if (_state->enable_vectorized_exec() && !_vpre_filter_ctxs.empty()) { - vectorized::VExpr::close(_vpre_filter_ctxs, _state); + if (_vpre_filter_ctx_ptr) { + (*_vpre_filter_ctx_ptr)->close(_state); } } diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 1c2ce211b5..fe3e088d4e 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -62,7 +62,7 @@ public: if (_state->enable_vectorized_exec()) { vectorized::VExpr::close(_dest_vexpr_ctx, _state); } - }; + } virtual Status init_expr_ctxes(); // Open this scanner, will initialize information need to @@ -138,7 +138,7 @@ protected: // for vectorized load std::vector<vectorized::VExprContext*> _dest_vexpr_ctx; - std::vector<vectorized::VExprContext*> _vpre_filter_ctxs; + std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr; vectorized::Block _src_block; int _num_of_columns_from_file; diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp b/be/test/vec/exec/vbroker_scanner_test.cpp index 713aefc4a7..428d82343c 100644 --- a/be/test/vec/exec/vbroker_scanner_test.cpp +++ b/be/test/vec/exec/vbroker_scanner_test.cpp @@ -363,7 +363,6 @@ TEST_F(VBrokerScannerTest, normal) { range.file_type = TFileType::FILE_LOCAL; range.format_type = TFileFormatType::FORMAT_CSV_PLAIN; ranges.push_back(range); - VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter, &_counter); auto st = scanner.open(); @@ -376,7 +375,6 @@ TEST_F(VBrokerScannerTest, normal) { ASSERT_TRUE(eof); auto columns = block->get_columns(); ASSERT_EQ(columns.size(), 3); - ASSERT_EQ(columns[0]->get_int(0), 1); ASSERT_EQ(columns[0]->get_int(1), 4); ASSERT_EQ(columns[0]->get_int(2), 8); @@ -390,6 +388,105 @@ TEST_F(VBrokerScannerTest, normal) { ASSERT_EQ(columns[2]->get_int(2), 10); } +TEST_F(VBrokerScannerTest, normal_with_pre_filter) { + std::vector<TBrokerRangeDesc> ranges; + TBrokerRangeDesc range; + range.path = "./be/test/exec/test_data/broker_scanner/normal.csv"; + range.start_offset = 0; + range.size = -1; + range.splittable = true; + range.file_type = TFileType::FILE_LOCAL; + range.format_type = TFileFormatType::FORMAT_CSV_PLAIN; + ranges.push_back(range); + + // init pre_filter expr: k1 < '8' + TTypeDesc int_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::INT); + node.__set_scalar_type(scalar_type); + int_type.types.push_back(node); + } + TTypeDesc varchar_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(5000); + node.__set_scalar_type(scalar_type); + varchar_type.types.push_back(node); + } + + TExpr filter_expr; + { + TExprNode expr_node; + expr_node.__set_node_type(TExprNodeType::BINARY_PRED); + expr_node.type = gen_type_desc(TPrimitiveType::BOOLEAN); + expr_node.__set_num_children(2); + expr_node.__isset.opcode = true; + expr_node.__set_opcode(TExprOpcode::LT); + expr_node.__isset.vector_opcode = true; + expr_node.__set_vector_opcode(TExprOpcode::LT); + expr_node.__isset.fn = true; + expr_node.fn.name.function_name = "lt"; + expr_node.fn.binary_type = TFunctionBinaryType::BUILTIN; + expr_node.fn.ret_type = int_type; + expr_node.fn.has_var_args = false; + filter_expr.nodes.push_back(expr_node); + } + { + TExprNode expr_node; + expr_node.__set_node_type(TExprNodeType::SLOT_REF); + expr_node.type = varchar_type; + expr_node.__set_num_children(0); + expr_node.__isset.slot_ref = true; + TSlotRef slot_ref; + slot_ref.__set_slot_id(4); + slot_ref.__set_tuple_id(1); + expr_node.__set_slot_ref(slot_ref); + expr_node.__isset.output_column = true; + expr_node.__set_output_column(0); + filter_expr.nodes.push_back(expr_node); + } + { + TExprNode expr_node; + expr_node.__set_node_type(TExprNodeType::STRING_LITERAL); + expr_node.type = varchar_type; + expr_node.__set_num_children(0); + expr_node.__isset.string_literal = true; + TStringLiteral string_literal; + string_literal.__set_value("8"); + expr_node.__set_string_literal(string_literal); + filter_expr.nodes.push_back(expr_node); + } + _pre_filter.push_back(filter_expr); + VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter, + &_counter); + auto st = scanner.open(); + ASSERT_TRUE(st.ok()); + + std::unique_ptr<vectorized::Block> block(new vectorized::Block()); + bool eof = false; + // end of file + st = scanner.get_next(block.get(), &eof); + ASSERT_TRUE(st.ok()); + ASSERT_TRUE(eof); + auto columns = block->get_columns(); + ASSERT_EQ(columns.size(), 3); + + ASSERT_EQ(columns[0]->get_int(0), 1); + ASSERT_EQ(columns[0]->get_int(1), 4); + + ASSERT_EQ(columns[1]->get_int(0), 2); + ASSERT_EQ(columns[1]->get_int(1), 5); + + ASSERT_EQ(columns[2]->get_int(0), 3); + ASSERT_EQ(columns[2]->get_int(1), 6); +} + TEST_F(VBrokerScannerTest, normal2) { std::vector<TBrokerRangeDesc> ranges; @@ -406,7 +503,6 @@ TEST_F(VBrokerScannerTest, normal2) { range.start_offset = 0; range.size = 4; ranges.push_back(range); - VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter, &_counter); auto st = scanner.open(); @@ -440,7 +536,6 @@ TEST_F(VBrokerScannerTest, normal5) { range.file_type = TFileType::FILE_LOCAL; range.format_type = TFileFormatType::FORMAT_CSV_PLAIN; ranges.push_back(range); - VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter, &_counter); auto st = scanner.open(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java index 9ca69c819c..9f063a47ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java @@ -34,6 +34,7 @@ import org.apache.doris.catalog.FunctionSet; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.rewrite.ExprRewriter; @@ -213,8 +214,12 @@ public abstract class LoadScanNode extends ScanNode { planNode.setNodeType(TPlanNodeType.BROKER_SCAN_NODE); TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt()); if (!preFilterConjuncts.isEmpty()) { - for (Expr e : preFilterConjuncts) { - brokerScanNode.addToPreFilterExprs(e.treeToThrift()); + if (Config.enable_vectorized_load && vpreFilterConjunct != null) { + brokerScanNode.addToPreFilterExprs(vpreFilterConjunct.treeToThrift()); + } else { + for (Expr e : preFilterConjuncts) { + brokerScanNode.addToPreFilterExprs(e.treeToThrift()); + } } } planNode.setBrokerScanNode(brokerScanNode); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index d43747ed11..f1bef30896 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -109,6 +109,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> { // 4. Filter data by using "conjuncts". protected List<Expr> preFilterConjuncts = Lists.newArrayList(); + protected Expr vpreFilterConjunct = null; + // Fragment that this PlanNode is executed in. Valid only after this PlanNode has been // assigned to a fragment. Set and maintained by enclosing PlanFragment. protected PlanFragment fragment; @@ -904,6 +906,11 @@ abstract public class PlanNode extends TreeNode<PlanNode> { initCompoundPredicate(vconjunct); } + if (!preFilterConjuncts.isEmpty()) { + vpreFilterConjunct = convertConjunctsToAndCompoundPredicate(preFilterConjuncts); + initCompoundPredicate(vpreFilterConjunct); + } + for (PlanNode child : children) { child.convertToVectoriezd(); } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 94168a9074..d4d37e11ac 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -252,7 +252,7 @@ struct TBrokerScanNode { // Partition info used to process partition select in broker load 2: optional list<Exprs.TExpr> partition_exprs 3: optional list<Partitions.TRangePartition> partition_infos - 4: optional list<Exprs.TExpr> pre_filter_exprs + 4: optional list<Exprs.TExpr> pre_filter_exprs } struct TEsScanNode { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org