This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4e4fb33995 [refactor](conjuncts) simplify conjuncts in exec node (#19254) 4e4fb33995 is described below commit 4e4fb339951c79252b4a728d1646b754e044199f Author: yiguolei <676222...@qq.com> AuthorDate: Thu May 4 18:04:32 2023 +0800 [refactor](conjuncts) simplify conjuncts in exec node (#19254) Co-authored-by: yiguolei <yiguo...@gmail.com> Currently, exec node save exprcontext**, but the object is in object pool, the code is very unclear. we could just use exprcontext*. --- be/src/exec/base_scanner.cpp | 9 ++-- be/src/exec/base_scanner.h | 2 +- be/src/exec/exec_node.cpp | 15 +++---- be/src/exec/exec_node.h | 2 +- be/src/exec/scan_node.cpp | 9 ++-- be/src/vec/exec/scan/new_es_scan_node.cpp | 2 +- be/src/vec/exec/scan/new_es_scanner.cpp | 2 +- be/src/vec/exec/scan/new_es_scanner.h | 2 +- be/src/vec/exec/scan/new_file_scan_node.cpp | 2 +- be/src/vec/exec/scan/new_jdbc_scan_node.cpp | 2 +- be/src/vec/exec/scan/new_jdbc_scanner.cpp | 4 +- be/src/vec/exec/scan/new_jdbc_scanner.h | 2 +- be/src/vec/exec/scan/new_odbc_scan_node.cpp | 2 +- be/src/vec/exec/scan/new_odbc_scanner.cpp | 4 +- be/src/vec/exec/scan/new_odbc_scanner.h | 2 +- be/src/vec/exec/scan/new_olap_scan_node.cpp | 4 +- be/src/vec/exec/scan/new_olap_scanner.cpp | 6 +-- be/src/vec/exec/scan/vfile_scanner.cpp | 11 +++-- be/src/vec/exec/scan/vfile_scanner.h | 4 +- be/src/vec/exec/scan/vmeta_scan_node.cpp | 2 +- be/src/vec/exec/scan/vmeta_scanner.cpp | 2 +- be/src/vec/exec/scan/vmeta_scanner.h | 2 +- be/src/vec/exec/scan/vscan_node.cpp | 68 +++++++++++++---------------- be/src/vec/exec/scan/vscan_node.h | 5 +-- be/src/vec/exec/scan/vscanner.cpp | 4 +- be/src/vec/exec/scan/vscanner.h | 2 +- be/src/vec/exec/vaggregation_node.cpp | 6 +-- be/src/vec/exprs/vexpr_context.cpp | 13 +----- be/src/vec/exprs/vexpr_context.h | 2 - 29 files changed, 84 insertions(+), 108 deletions(-) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 587be2f35e..ffe6bc1815 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -143,11 +143,10 @@ Status BaseScanner::init_expr_ctxes() { if (!_pre_filter_texprs.empty()) { // for vectorized, preceding filter exprs should be compounded to one passed from fe. DCHECK(_pre_filter_texprs.size() == 1); - _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*); RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree( - _state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get())); - RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc)); - RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state)); + _state->obj_pool(), _pre_filter_texprs[0], &_vpre_filter_ctx_ptr)); + RETURN_IF_ERROR(_vpre_filter_ctx_ptr->prepare(_state, *_row_desc)); + RETURN_IF_ERROR(_vpre_filter_ctx_ptr->open(_state)); } // Construct dest slots information @@ -365,7 +364,7 @@ Status BaseScanner::_fill_dest_block(vectorized::Block* dest_block, bool* eof) { void BaseScanner::close() { if (_vpre_filter_ctx_ptr) { - (*_vpre_filter_ctx_ptr)->close(_state); + _vpre_filter_ctx_ptr->close(_state); } } diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index ee4ae973ef..0aad811667 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -131,7 +131,7 @@ protected: // for vectorized load std::vector<vectorized::VExprContext*> _dest_vexpr_ctx; - std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr; + vectorized::VExprContext* _vpre_filter_ctx_ptr = nullptr; vectorized::Block _src_block; bool _src_block_mem_reuse = false; int _num_of_columns_from_file; diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 68360adba2..ba807bd77d 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -104,9 +104,8 @@ Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) { init_runtime_profile(get_name()); if (tnode.__isset.vconjunct) { - _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*); RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_tree(_pool, tnode.vconjunct, - _vconjunct_ctx_ptr.get())); + &_vconjunct_ctx_ptr)); } // create the projections expr @@ -131,8 +130,8 @@ Status ExecNode::prepare(RuntimeState* state) { _mem_tracker = std::make_unique<MemTracker>("ExecNode:" + _runtime_profile->name(), _runtime_profile.get(), nullptr, "PeakMemoryUsage"); - if (_vconjunct_ctx_ptr) { - RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, intermediate_row_desc())); + if (_vconjunct_ctx_ptr != nullptr) { + RETURN_IF_ERROR(_vconjunct_ctx_ptr->prepare(state, intermediate_row_desc())); } RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, intermediate_row_desc())); @@ -145,8 +144,8 @@ Status ExecNode::prepare(RuntimeState* state) { } Status ExecNode::alloc_resource(doris::RuntimeState* state) { - if (_vconjunct_ctx_ptr) { - RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->open(state)); + if (_vconjunct_ctx_ptr != nullptr) { + RETURN_IF_ERROR(_vconjunct_ctx_ptr->open(state)); } RETURN_IF_ERROR(vectorized::VExpr::open(_projections, state)); return Status::OK(); @@ -178,8 +177,8 @@ void ExecNode::release_resource(doris::RuntimeState* state) { COUNTER_SET(_rows_returned_counter, _num_rows_returned); } - if (_vconjunct_ctx_ptr) { - (*_vconjunct_ctx_ptr)->close(state); + if (_vconjunct_ctx_ptr != nullptr) { + _vconjunct_ctx_ptr->close(state); } vectorized::VExpr::close(_projections, state); diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 68f84934ad..edbec218d7 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -258,7 +258,7 @@ protected: ObjectPool* _pool; std::vector<TupleId> _tuple_ids; - std::unique_ptr<doris::vectorized::VExprContext*> _vconjunct_ctx_ptr; + doris::vectorized::VExprContext* _vconjunct_ctx_ptr = nullptr; std::vector<ExecNode*> _children; RowDescriptor _row_descriptor; diff --git a/be/src/exec/scan_node.cpp b/be/src/exec/scan_node.cpp index 281a55b6f3..00496b306f 100644 --- a/be/src/exec/scan_node.cpp +++ b/be/src/exec/scan_node.cpp @@ -65,16 +65,15 @@ void ScanNode::_peel_pushed_vconjunct(RuntimeState* state, } int leaf_index = 0; - vectorized::VExpr* conjunct_expr_root = (*_vconjunct_ctx_ptr)->root(); + vectorized::VExpr* conjunct_expr_root = _vconjunct_ctx_ptr->root(); if (conjunct_expr_root != nullptr) { vectorized::VExpr* new_conjunct_expr_root = vectorized::VectorizedUtils::dfs_peel_conjunct( - state, *_vconjunct_ctx_ptr, conjunct_expr_root, leaf_index, checker); + state, _vconjunct_ctx_ptr, conjunct_expr_root, leaf_index, checker); if (new_conjunct_expr_root == nullptr) { - (*_vconjunct_ctx_ptr)->close(state); - _vconjunct_ctx_ptr.reset(nullptr); + _vconjunct_ctx_ptr->close(state); } else { - (*_vconjunct_ctx_ptr)->set_root(new_conjunct_expr_root); + _vconjunct_ctx_ptr->set_root(new_conjunct_expr_root); } } } diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp b/be/src/vec/exec/scan/new_es_scan_node.cpp index d684dead0a..a037761278 100644 --- a/be/src/vec/exec/scan/new_es_scan_node.cpp +++ b/be/src/vec/exec/scan/new_es_scan_node.cpp @@ -167,7 +167,7 @@ Status NewEsScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { _state, this, _limit_per_scanner, _tuple_id, properties, _docvalue_context, doc_value_mode, _state->runtime_profile()); - RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get())); + RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr)); scanners->push_back(scanner); } return Status::OK(); diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp b/be/src/vec/exec/scan/new_es_scanner.cpp index a33d4a59d3..d65ad50615 100644 --- a/be/src/vec/exec/scan/new_es_scanner.cpp +++ b/be/src/vec/exec/scan/new_es_scanner.cpp @@ -58,7 +58,7 @@ NewEsScanner::NewEsScanner(RuntimeState* state, NewEsScanNode* parent, int64_t l _docvalue_context(docvalue_context), _doc_value_mode(doc_value_mode) {} -Status NewEsScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr) { +Status NewEsScanner::prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr) { VLOG_CRITICAL << NEW_SCANNER_TYPE << "::prepare"; RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr)); diff --git a/be/src/vec/exec/scan/new_es_scanner.h b/be/src/vec/exec/scan/new_es_scanner.h index e855f514d4..28a9872cd5 100644 --- a/be/src/vec/exec/scan/new_es_scanner.h +++ b/be/src/vec/exec/scan/new_es_scanner.h @@ -60,7 +60,7 @@ public: Status close(RuntimeState* state) override; public: - Status prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr); + Status prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr); protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index 3bbc8fc978..ed8b80ab57 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -111,7 +111,7 @@ Status NewFileScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { VFileScanner::create_unique(_state, this, _limit_per_scanner, scan_range.scan_range.ext_scan_range.file_scan_range, runtime_profile(), _kv_cache.get()); - RETURN_IF_ERROR(scanner->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range, + RETURN_IF_ERROR(scanner->prepare(_vconjunct_ctx_ptr, &_colname_to_value_range, &_colname_to_slot_id)); scanners->push_back(std::move(scanner)); } diff --git a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp index f9f794ccb6..8b240ef9c6 100644 --- a/be/src/vec/exec/scan/new_jdbc_scan_node.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scan_node.cpp @@ -68,7 +68,7 @@ Status NewJdbcScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { std::unique_ptr<NewJdbcScanner> scanner = NewJdbcScanner::create_unique(_state, this, _limit_per_scanner, _tuple_id, _query_string, _table_type, _state->runtime_profile()); - RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get())); + RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr)); scanners->push_back(std::move(scanner)); return Status::OK(); } diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp index 1de4a10cf9..150203035b 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp @@ -54,11 +54,11 @@ NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int _connector_close_timer = ADD_TIMER(get_parent()->_scanner_profile, "ConnectorCloseTime"); } -Status NewJdbcScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr) { +Status NewJdbcScanner::prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr) { VLOG_CRITICAL << "NewJdbcScanner::Prepare"; if (vconjunct_ctx_ptr != nullptr) { // Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx. - RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(state, &_vconjunct_ctx)); + RETURN_IF_ERROR(vconjunct_ctx_ptr->clone(state, &_vconjunct_ctx)); } if (_is_init) { diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.h b/be/src/vec/exec/scan/new_jdbc_scanner.h index ef436d5aec..8db47fb95f 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.h +++ b/be/src/vec/exec/scan/new_jdbc_scanner.h @@ -52,7 +52,7 @@ public: Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; - Status prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr); + Status prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr); protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.cpp b/be/src/vec/exec/scan/new_odbc_scan_node.cpp index 17d7f7784f..856a5f8de5 100644 --- a/be/src/vec/exec/scan/new_odbc_scan_node.cpp +++ b/be/src/vec/exec/scan/new_odbc_scan_node.cpp @@ -67,7 +67,7 @@ Status NewOdbcScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { } std::shared_ptr<NewOdbcScanner> scanner = NewOdbcScanner::create_shared( _state, this, _limit_per_scanner, _odbc_scan_node, _state->runtime_profile()); - RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get())); + RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr)); scanners->push_back(scanner); return Status::OK(); } diff --git a/be/src/vec/exec/scan/new_odbc_scanner.cpp b/be/src/vec/exec/scan/new_odbc_scanner.cpp index 5c912909b4..8494973f08 100644 --- a/be/src/vec/exec/scan/new_odbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_odbc_scanner.cpp @@ -61,11 +61,11 @@ NewOdbcScanner::NewOdbcScanner(RuntimeState* state, NewOdbcScanNode* parent, int _tuple_id(odbc_scan_node.tuple_id), _tuple_desc(nullptr) {} -Status NewOdbcScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr) { +Status NewOdbcScanner::prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr) { VLOG_CRITICAL << NEW_SCANNER_TYPE << "::prepare"; if (vconjunct_ctx_ptr != nullptr) { // Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx. - RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(state, &_vconjunct_ctx)); + RETURN_IF_ERROR(vconjunct_ctx_ptr->clone(state, &_vconjunct_ctx)); } if (_is_init) { diff --git a/be/src/vec/exec/scan/new_odbc_scanner.h b/be/src/vec/exec/scan/new_odbc_scanner.h index 62043f6ff7..0fbafd0c5c 100644 --- a/be/src/vec/exec/scan/new_odbc_scanner.h +++ b/be/src/vec/exec/scan/new_odbc_scanner.h @@ -56,7 +56,7 @@ public: Status close(RuntimeState* state) override; public: - Status prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr); + Status prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr); protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 6e94e152b2..79d4398461 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -431,9 +431,9 @@ Status NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { SCOPED_TIMER(_scanner_init_timer); auto span = opentelemetry::trace::Tracer::GetCurrentSpan(); - if (_vconjunct_ctx_ptr && (*_vconjunct_ctx_ptr)->root()) { + if (_vconjunct_ctx_ptr && _vconjunct_ctx_ptr->root()) { _runtime_profile->add_info_string("RemainedDownPredicates", - (*_vconjunct_ctx_ptr)->root()->debug_string()); + _vconjunct_ctx_ptr->root()->debug_string()); } if (!_olap_scan_node.output_column_unique_ids.empty()) { diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 9310db989c..e87f851126 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -100,11 +100,11 @@ static std::string read_columns_to_string(TabletSchemaSPtr tablet_schema, Status NewOlapScanner::init() { _is_init = true; auto parent = static_cast<NewOlapScanNode*>(_parent); - RETURN_IF_ERROR(VScanner::prepare(_state, parent->_vconjunct_ctx_ptr.get())); + RETURN_IF_ERROR(VScanner::prepare(_state, parent->_vconjunct_ctx_ptr)); if (parent->_common_vexpr_ctxs_pushdown != nullptr) { // Copy common_vexpr_ctxs_pushdown from scan node to this scanner's _common_vexpr_ctxs_pushdown, just necessary. - RETURN_IF_ERROR((*parent->_common_vexpr_ctxs_pushdown) - ->clone(_state, &_common_vexpr_ctxs_pushdown)); + RETURN_IF_ERROR( + parent->_common_vexpr_ctxs_pushdown->clone(_state, &_common_vexpr_ctxs_pushdown)); } // set limit to reduce end of rowset and segment mem use diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 4ea8de36c7..caf71684c2 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -97,7 +97,7 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t } Status VFileScanner::prepare( - VExprContext** vconjunct_ctx_ptr, + VExprContext* vconjunct_ctx_ptr, std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range, const std::unordered_map<std::string, int>* colname_to_slot_id) { RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr)); @@ -127,11 +127,10 @@ Status VFileScanner::prepare( std::vector<bool>({false}))); // prepare pre filters if (_params.__isset.pre_filter_exprs) { - _pre_conjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*); RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_tree( - _state->obj_pool(), _params.pre_filter_exprs, _pre_conjunct_ctx_ptr.get())); - RETURN_IF_ERROR((*_pre_conjunct_ctx_ptr)->prepare(_state, *_src_row_desc)); - RETURN_IF_ERROR((*_pre_conjunct_ctx_ptr)->open(_state)); + _state->obj_pool(), _params.pre_filter_exprs, &_pre_conjunct_ctx_ptr)); + RETURN_IF_ERROR(_pre_conjunct_ctx_ptr->prepare(_state, *_src_row_desc)); + RETURN_IF_ERROR(_pre_conjunct_ctx_ptr->open(_state)); } } @@ -851,7 +850,7 @@ Status VFileScanner::close(RuntimeState* state) { } if (_pre_conjunct_ctx_ptr) { - (*_pre_conjunct_ctx_ptr)->close(state); + _pre_conjunct_ctx_ptr->close(state); } if (_push_down_expr) { diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index fc1da0ec6c..54a1df77d3 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -71,7 +71,7 @@ public: Status close(RuntimeState* state) override; - Status prepare(VExprContext** vconjunct_ctx_ptr, + Status prepare(VExprContext* vconjunct_ctx_ptr, std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range, const std::unordered_map<std::string, int>* colname_to_slot_id); @@ -128,7 +128,7 @@ protected: std::unordered_set<std::string> _missing_cols; // For load task - std::unique_ptr<doris::vectorized::VExprContext*> _pre_conjunct_ctx_ptr; + doris::vectorized::VExprContext* _pre_conjunct_ctx_ptr = nullptr; std::unique_ptr<RowDescriptor> _src_row_desc; // row desc for default exprs std::unique_ptr<RowDescriptor> _default_val_row_desc; diff --git a/be/src/vec/exec/scan/vmeta_scan_node.cpp b/be/src/vec/exec/scan/vmeta_scan_node.cpp index deb4980f23..3dec257dca 100644 --- a/be/src/vec/exec/scan/vmeta_scan_node.cpp +++ b/be/src/vec/exec/scan/vmeta_scan_node.cpp @@ -65,7 +65,7 @@ Status VMetaScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { for (auto& scan_range : _scan_ranges) { std::shared_ptr<VMetaScanner> scanner = VMetaScanner::create_shared( _state, this, _tuple_id, scan_range, _limit_per_scanner, runtime_profile()); - RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr.get())); + RETURN_IF_ERROR(scanner->prepare(_state, _vconjunct_ctx_ptr)); scanners->push_back(scanner); } return Status::OK(); diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp b/be/src/vec/exec/scan/vmeta_scanner.cpp index db17ea9ac4..50e81ebe09 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.cpp +++ b/be/src/vec/exec/scan/vmeta_scanner.cpp @@ -70,7 +70,7 @@ Status VMetaScanner::open(RuntimeState* state) { return Status::OK(); } -Status VMetaScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr) { +Status VMetaScanner::prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr) { VLOG_CRITICAL << "VMetaScanner::prepare"; RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr)); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); diff --git a/be/src/vec/exec/scan/vmeta_scanner.h b/be/src/vec/exec/scan/vmeta_scanner.h index e1de191342..3cac485cb2 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.h +++ b/be/src/vec/exec/scan/vmeta_scanner.h @@ -55,7 +55,7 @@ public: Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; - Status prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr); + Status prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr); protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index f926ab15c6..febcf72f18 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -391,8 +391,8 @@ Status VScanNode::_append_rf_into_conjuncts(std::vector<VExpr*>& vexprs) { } VExpr* last_expr = nullptr; - if (_vconjunct_ctx_ptr) { - last_expr = (*_vconjunct_ctx_ptr)->root(); + if (_vconjunct_ctx_ptr != nullptr) { + last_expr = _vconjunct_ctx_ptr->root(); } else { DCHECK(_rf_vexpr_set.find(vexprs[0]) == _rf_vexpr_set.end()); last_expr = vexprs[0]; @@ -430,15 +430,14 @@ Status VScanNode::_append_rf_into_conjuncts(std::vector<VExpr*>& vexprs) { } auto new_vconjunct_ctx_ptr = _pool->add(VExprContext::create_unique(last_expr).release()); if (_vconjunct_ctx_ptr) { - (*_vconjunct_ctx_ptr)->clone_fn_contexts(new_vconjunct_ctx_ptr); + _vconjunct_ctx_ptr->clone_fn_contexts(new_vconjunct_ctx_ptr); } RETURN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(_state, _row_descriptor)); RETURN_IF_ERROR(new_vconjunct_ctx_ptr->open(_state)); if (_vconjunct_ctx_ptr) { - _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr)); + _stale_vexpr_ctxs.push_back(_vconjunct_ctx_ptr); } - _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*); - *_vconjunct_ctx_ptr = new_vconjunct_ctx_ptr; + _vconjunct_ctx_ptr = new_vconjunct_ctx_ptr; return Status::OK(); } @@ -468,10 +467,10 @@ void VScanNode::release_resource(RuntimeState* state) { } for (auto& ctx : _stale_vexpr_ctxs) { - (*ctx)->close(state); + ctx->close(state); } if (_common_vexpr_ctxs_pushdown) { - (*_common_vexpr_ctxs_pushdown)->close(state); + _common_vexpr_ctxs_pushdown->close(state); } ExecNode::release_resource(state); @@ -538,18 +537,18 @@ Status VScanNode::_normalize_conjuncts() { } } if (_vconjunct_ctx_ptr) { - if ((*_vconjunct_ctx_ptr)->root()) { + if (_vconjunct_ctx_ptr->root()) { VExpr* new_root; - RETURN_IF_ERROR(_normalize_predicate((*_vconjunct_ctx_ptr)->root(), &new_root)); + RETURN_IF_ERROR(_normalize_predicate(_vconjunct_ctx_ptr->root(), &new_root)); if (new_root) { - (*_vconjunct_ctx_ptr)->set_root(new_root); + _vconjunct_ctx_ptr->set_root(new_root); if (_should_push_down_common_expr()) { - _common_vexpr_ctxs_pushdown = std::move(_vconjunct_ctx_ptr); - _vconjunct_ctx_ptr.reset(nullptr); + _common_vexpr_ctxs_pushdown = _vconjunct_ctx_ptr; + _vconjunct_ctx_ptr = nullptr; } } else { // All conjucts are pushed down as predicate column - _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr)); - _vconjunct_ctx_ptr.reset(nullptr); + _stale_vexpr_ctxs.push_back(_vconjunct_ctx_ptr); + _vconjunct_ctx_ptr = nullptr; } } } @@ -604,7 +603,7 @@ Status VScanNode::_normalize_predicate(VExpr* conjunct_expr_root, VExpr** output SlotDescriptor* slot = nullptr; ColumnValueRangeType* range = nullptr; PushDownType pdt = PushDownType::UNACCEPTABLE; - RETURN_IF_ERROR(_eval_const_conjuncts(cur_expr, *_vconjunct_ctx_ptr, &pdt)); + RETURN_IF_ERROR(_eval_const_conjuncts(cur_expr, _vconjunct_ctx_ptr, &pdt)); if (pdt == PushDownType::ACCEPTABLE) { *output_expr = nullptr; return Status::OK(); @@ -618,28 +617,23 @@ Status VScanNode::_normalize_predicate(VExpr* conjunct_expr_root, VExpr** output is_runtimer_filter_predicate); }}; RETURN_IF_PUSH_DOWN(_normalize_in_and_eq_predicate( - cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range, - &pdt)); + cur_expr, _vconjunct_ctx_ptr, slot, value_range, &pdt)); RETURN_IF_PUSH_DOWN(_normalize_not_in_and_not_eq_predicate( - cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range, - &pdt)); + cur_expr, _vconjunct_ctx_ptr, slot, value_range, &pdt)); RETURN_IF_PUSH_DOWN(_normalize_is_null_predicate( - cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range, - &pdt)); + cur_expr, _vconjunct_ctx_ptr, slot, value_range, &pdt)); RETURN_IF_PUSH_DOWN(_normalize_noneq_binary_predicate( - cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range, - &pdt)); + cur_expr, _vconjunct_ctx_ptr, slot, value_range, &pdt)); RETURN_IF_PUSH_DOWN(_normalize_match_predicate( - cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range, - &pdt)); + cur_expr, _vconjunct_ctx_ptr, slot, value_range, &pdt)); if (_is_key_column(slot->col_name())) { RETURN_IF_PUSH_DOWN(_normalize_bitmap_filter( - cur_expr, *(_vconjunct_ctx_ptr.get()), slot, &pdt)); + cur_expr, _vconjunct_ctx_ptr, slot, &pdt)); RETURN_IF_PUSH_DOWN(_normalize_bloom_filter( - cur_expr, *(_vconjunct_ctx_ptr.get()), slot, &pdt)); + cur_expr, _vconjunct_ctx_ptr, slot, &pdt)); if (_state->enable_function_pushdown()) { RETURN_IF_PUSH_DOWN(_normalize_function_filters( - cur_expr, *(_vconjunct_ctx_ptr.get()), slot, &pdt)); + cur_expr, _vconjunct_ctx_ptr, slot, &pdt)); } } }, @@ -648,7 +642,7 @@ Status VScanNode::_normalize_predicate(VExpr* conjunct_expr_root, VExpr** output if (pdt == PushDownType::UNACCEPTABLE && TExprNodeType::COMPOUND_PRED == cur_expr->node_type()) { - _normalize_compound_predicate(cur_expr, *(_vconjunct_ctx_ptr.get()), &pdt, + _normalize_compound_predicate(cur_expr, _vconjunct_ctx_ptr, &pdt, is_runtimer_filter_predicate, in_predicate_checker, eq_predicate_checker); *output_expr = conjunct_expr_root; // remaining in conjunct tree @@ -676,18 +670,18 @@ Status VScanNode::_normalize_predicate(VExpr* conjunct_expr_root, VExpr** output } else { if (left_child == nullptr) { conjunct_expr_root->children()[0]->close( - _state, *_vconjunct_ctx_ptr, - (*_vconjunct_ctx_ptr)->get_function_state_scope()); + _state, _vconjunct_ctx_ptr, + _vconjunct_ctx_ptr->get_function_state_scope()); } if (right_child == nullptr) { conjunct_expr_root->children()[1]->close( - _state, *_vconjunct_ctx_ptr, - (*_vconjunct_ctx_ptr)->get_function_state_scope()); + _state, _vconjunct_ctx_ptr, + _vconjunct_ctx_ptr->get_function_state_scope()); } // here only close the and expr self, do not close the child conjunct_expr_root->set_children({}); - conjunct_expr_root->close(_state, *_vconjunct_ctx_ptr, - (*_vconjunct_ctx_ptr)->get_function_state_scope()); + conjunct_expr_root->close(_state, _vconjunct_ctx_ptr, + _vconjunct_ctx_ptr->get_function_state_scope()); } // here do not close VExpr* now @@ -1358,7 +1352,7 @@ Status VScanNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) { Status VScanNode::clone_vconjunct_ctx(VExprContext** _vconjunct_ctx) { if (_vconjunct_ctx_ptr) { std::unique_lock l(_rf_locks); - return (*_vconjunct_ctx_ptr)->clone(_state, _vconjunct_ctx); + return _vconjunct_ctx_ptr->clone(_state, _vconjunct_ctx); } return Status::OK(); } diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 13361bdde8..96fa98c070 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -269,7 +269,6 @@ protected: // Set to true if the runtime filter is ready. std::vector<bool> _runtime_filter_ready_flag; doris::Mutex _rf_locks; - std::map<int, RuntimeFilterContext*> _conjunct_id_to_runtime_filter_ctxs; phmap::flat_hash_set<VExpr*> _rf_vexpr_set; // True means all runtime filters are applied to scanners bool _is_all_rf_applied = true; @@ -322,8 +321,8 @@ protected: // Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in this vector // so that it will be destroyed uniformly at the end of the query. - std::vector<std::unique_ptr<VExprContext*>> _stale_vexpr_ctxs; - std::unique_ptr<VExprContext*> _common_vexpr_ctxs_pushdown = nullptr; + std::vector<VExprContext*> _stale_vexpr_ctxs; + VExprContext* _common_vexpr_ctxs_pushdown = nullptr; // If sort info is set, push limit to each scanner; int64_t _limit_per_scanner = -1; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index e2c9135397..f3144c8c62 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -40,10 +40,10 @@ VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, Runtim _is_load = (_input_tuple_desc != nullptr); } -Status VScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr) { +Status VScanner::prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr) { if (vconjunct_ctx_ptr != nullptr) { // Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx. - RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx)); + RETURN_IF_ERROR(vconjunct_ctx_ptr->clone(_state, &_vconjunct_ctx)); } return Status::OK(); } diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index fe9744ba65..4fcc019fdc 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -77,7 +77,7 @@ protected: Status _filter_output_block(Block* block); // Not virtual, all child will call this method explictly - Status prepare(RuntimeState* state, VExprContext** vconjunct_ctx_ptr); + Status prepare(RuntimeState* state, VExprContext* vconjunct_ctx_ptr); public: VScanNode* get_parent() { return _parent; } diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index e998b2127d..4c09e2b16c 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -473,9 +473,9 @@ Status AggregationNode::prepare_profile(RuntimeState* state) { std::bind<void>(&AggregationNode::_update_memusage_with_serialized_key, this); _executor.close = std::bind<void>(&AggregationNode::_close_with_serialized_key, this); - _should_limit_output = _limit != -1 && // has limit - !_vconjunct_ctx_ptr && // no having conjunct - _needs_finalize; // agg's finalize step + _should_limit_output = _limit != -1 && // has limit + _vconjunct_ctx_ptr == nullptr && // no having conjunct + _needs_finalize; // agg's finalize step } return Status::OK(); diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp index a22b61e33a..37155455bf 100644 --- a/be/src/vec/exprs/vexpr_context.cpp +++ b/be/src/vec/exprs/vexpr_context.cpp @@ -88,7 +88,7 @@ void VExprContext::close(doris::RuntimeState* state) { } doris::Status VExprContext::clone(RuntimeState* state, VExprContext** new_ctx) { - DCHECK(_prepared); + DCHECK(_prepared) << "expr context not prepared"; DCHECK(_opened); DCHECK(*new_ctx == nullptr); @@ -127,17 +127,6 @@ Status VExprContext::filter_block(VExprContext* vexpr_ctx, Block* block, int col return Block::filter_block(block, result_column_id, column_to_keep); } -Status VExprContext::filter_block(const std::unique_ptr<VExprContext*>& vexpr_ctx_ptr, Block* block, - int column_to_keep) { - if (vexpr_ctx_ptr == nullptr || block->rows() == 0) { - return Status::OK(); - } - DCHECK((*vexpr_ctx_ptr) != nullptr); - int result_column_id = -1; - RETURN_IF_ERROR((*vexpr_ctx_ptr)->execute(block, &result_column_id)); - return Block::filter_block(block, result_column_id, column_to_keep); -} - Block VExprContext::get_output_block_after_execute_exprs( const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs, const Block& input_block, Status& status) { diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h index 0ebc2ae422..1629860f6b 100644 --- a/be/src/vec/exprs/vexpr_context.h +++ b/be/src/vec/exprs/vexpr_context.h @@ -68,8 +68,6 @@ public: [[nodiscard]] static Status filter_block(VExprContext* vexpr_ctx, Block* block, int column_to_keep); - [[nodiscard]] static Status filter_block(const std::unique_ptr<VExprContext*>& vexpr_ctx_ptr, - Block* block, int column_to_keep); static Block get_output_block_after_execute_exprs(const std::vector<vectorized::VExprContext*>&, const Block&, Status&); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org