This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 15abafee71 [Bug](runtime filters) support late-arrival runtime filters (#11599) 15abafee71 is described below commit 15abafee71cfd1e75963e6bf20d3927840239984 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Aug 12 11:55:15 2022 +0800 [Bug](runtime filters) support late-arrival runtime filters (#11599) --- be/src/vec/exec/volap_scan_node.cpp | 45 ++--------------------------- be/src/vec/exec/volap_scanner.cpp | 3 ++ be/src/vec/exec/volap_scanner.h | 8 +++++ be/src/vec/exprs/vbloom_predicate.cpp | 16 ++++++++-- be/src/vec/exprs/vruntimefilter_wrapper.cpp | 14 +++++++-- be/src/vec/exprs/vruntimefilter_wrapper.h | 2 ++ 6 files changed, 41 insertions(+), 47 deletions(-) diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index cd4b3bb42d..0b2e814e55 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -420,9 +420,6 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { scanner->set_opened(); } - /* - // the following code will cause coredump when running tpcds_sf1 sqls, - // disable temporariy to avoid it, SHOULD BE FIX LATER std::vector<VExpr*> vexprs; auto& scanner_filter_apply_marks = *scanner->mutable_runtime_filter_marks(); DCHECK(scanner_filter_apply_marks.size() == _runtime_filter_descs.size()); @@ -450,42 +447,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { if (!_runtime_filter_ready_flag[i]) { // Use all conjuncts and new arrival runtime filters to construct a new // expression tree here. - auto last_expr = - _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : vexprs[0]; - for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) { - if (_rf_vexpr_set.find(vexprs[j]) != _rf_vexpr_set.end()) { - continue; - } - TExprNode texpr_node; - texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); - texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED); - texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND); - VExpr* new_node = _pool->add(new VcompoundPred(texpr_node)); - new_node->add_child(last_expr); - new_node->add_child(vexprs[j]); - last_expr = new_node; - _rf_vexpr_set.insert(vexprs[j]); - } - auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr)); - auto expr_status = new_vconjunct_ctx_ptr->prepare(state, row_desc()); - // If error occurs in `prepare` or `open` phase, discard these runtime - // filters directly. - if (UNLIKELY(!expr_status.OK())) { - LOG(WARNING) << "Something wrong for runtime filters: " << expr_status; - vexprs.clear(); - break; - } - expr_status = new_vconjunct_ctx_ptr->open(state); - if (UNLIKELY(!expr_status.OK())) { - LOG(WARNING) << "Something wrong for runtime filters: " << expr_status; - vexprs.clear(); - break; - } - if (_vconjunct_ctx_ptr) { - _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr)); - } - _vconjunct_ctx_ptr.reset(new VExprContext*); - *(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr; + _append_rf_into_conjuncts(state, vexprs); _runtime_filter_ready_flag[i] = true; } } @@ -495,14 +457,11 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { if (!vexprs.empty()) { if (*scanner->vconjunct_ctx_ptr()) { - (*scanner->vconjunct_ctx_ptr())->close(state); - *scanner->vconjunct_ctx_ptr() = nullptr; + scanner->discard_conjuncts(); } WARN_IF_ERROR((*_vconjunct_ctx_ptr)->clone(state, scanner->vconjunct_ctx_ptr()), "Something wrong for runtime filters: "); - scanner->set_use_pushdown_conjuncts(true); } - */ std::vector<Block*> blocks; diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp index beb04ab0b8..9218fcac91 100644 --- a/be/src/vec/exec/volap_scanner.cpp +++ b/be/src/vec/exec/volap_scanner.cpp @@ -367,6 +367,9 @@ Status VOlapScanner::close(RuntimeState* state) { if (_is_closed) { return Status::OK(); } + for (auto& ctx : _stale_vexpr_ctxs) { + ctx->close(state); + } if (_vconjunct_ctx) { _vconjunct_ctx->close(state); } diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h index f36fd55adf..3708b5b45e 100644 --- a/be/src/vec/exec/volap_scanner.h +++ b/be/src/vec/exec/volap_scanner.h @@ -61,6 +61,12 @@ public: VExprContext** vconjunct_ctx_ptr() { return &_vconjunct_ctx; } + void discard_conjuncts() { + _vconjunct_ctx->mark_as_stale(); + _stale_vexpr_ctxs.push_back(_vconjunct_ctx); + _vconjunct_ctx = nullptr; + } + void mark_to_need_to_close() { _need_to_close = true; } bool need_to_close() { return _need_to_close; } @@ -143,6 +149,8 @@ private: bool _need_to_close = false; TabletSchemaSPtr _tablet_schema; + + std::vector<VExprContext*> _stale_vexpr_ctxs; }; } // namespace vectorized diff --git a/be/src/vec/exprs/vbloom_predicate.cpp b/be/src/vec/exprs/vbloom_predicate.cpp index c708efdda0..83e1443ec6 100644 --- a/be/src/vec/exprs/vbloom_predicate.cpp +++ b/be/src/vec/exprs/vbloom_predicate.cpp @@ -19,6 +19,8 @@ #include <string_view> +#include "vec/data_types/data_type_nullable.h" + namespace doris::vectorized { VBloomPredicate::VBloomPredicate(const TExprNode& node) @@ -73,8 +75,18 @@ Status VBloomPredicate::execute(VExprContext* context, Block* block, int* result size_t sz = argument_column->size(); res_data_column->resize(sz); auto ptr = ((ColumnVector<UInt8>*)res_data_column.get())->get_data().data(); - for (size_t i = 0; i < sz; i++) { - ptr[i] = _filter->find(reinterpret_cast<const void*>(argument_column->get_data_at(i).data)); + if (WhichDataType(remove_nullable(block->get_by_position(arguments[0]).type)) + .is_string_or_fixed_string()) { + for (size_t i = 0; i < sz; i++) { + auto ele = argument_column->get_data_at(i); + const StringValue v(ele.data, ele.size); + ptr[i] = _filter->find(reinterpret_cast<const void*>(&v)); + } + } else { + for (size_t i = 0; i < sz; i++) { + ptr[i] = _filter->find( + reinterpret_cast<const void*>(argument_column->get_data_at(i).data)); + } } if (_data_type->is_nullable()) { auto null_map = ColumnVector<UInt8>::create(block->rows(), 0); diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp b/be/src/vec/exprs/vruntimefilter_wrapper.cpp index 2bd353e934..cdc3d2d5dc 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp +++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp @@ -29,14 +29,20 @@ namespace doris::vectorized { VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr* impl) - : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), _scan_rows(0) {} + : VExpr(node), + _impl(impl), + _always_true(false), + _filtered_rows(0), + _scan_rows(0), + _is_closed(false) {} VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper& vexpr) : VExpr(vexpr), _impl(vexpr._impl), _always_true(vexpr._always_true), _filtered_rows(vexpr._filtered_rows.load()), - _scan_rows(vexpr._scan_rows.load()) {} + _scan_rows(vexpr._scan_rows.load()), + _is_closed(false) {} Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) { @@ -52,6 +58,10 @@ Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context, void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) { + if (_is_closed) { + return; + } + _is_closed = true; _impl->close(state, context, scope); } diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h b/be/src/vec/exprs/vruntimefilter_wrapper.h index 91a6bdbcac..e04faba9ef 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.h +++ b/be/src/vec/exprs/vruntimefilter_wrapper.h @@ -61,5 +61,7 @@ private: constexpr static double EXPECTED_FILTER_RATE = 0.2; std::string _expr_name; + + bool _is_closed; }; } // namespace doris::vectorized \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org