This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 15abafee71 [Bug](runtime filters) support late-arrival runtime filters 
(#11599)
15abafee71 is described below

commit 15abafee71cfd1e75963e6bf20d3927840239984
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Aug 12 11:55:15 2022 +0800

    [Bug](runtime filters) support late-arrival runtime filters (#11599)
---
 be/src/vec/exec/volap_scan_node.cpp         | 45 ++---------------------------
 be/src/vec/exec/volap_scanner.cpp           |  3 ++
 be/src/vec/exec/volap_scanner.h             |  8 +++++
 be/src/vec/exprs/vbloom_predicate.cpp       | 16 ++++++++--
 be/src/vec/exprs/vruntimefilter_wrapper.cpp | 14 +++++++--
 be/src/vec/exprs/vruntimefilter_wrapper.h   |  2 ++
 6 files changed, 41 insertions(+), 47 deletions(-)

diff --git a/be/src/vec/exec/volap_scan_node.cpp 
b/be/src/vec/exec/volap_scan_node.cpp
index cd4b3bb42d..0b2e814e55 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -420,9 +420,6 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
         scanner->set_opened();
     }
 
-    /*
-    // the following code will cause coredump when running tpcds_sf1 sqls,
-    // disable temporariy to avoid it, SHOULD BE FIX LATER
     std::vector<VExpr*> vexprs;
     auto& scanner_filter_apply_marks = 
*scanner->mutable_runtime_filter_marks();
     DCHECK(scanner_filter_apply_marks.size() == _runtime_filter_descs.size());
@@ -450,42 +447,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
                     if (!_runtime_filter_ready_flag[i]) {
                         // Use all conjuncts and new arrival runtime filters 
to construct a new
                         // expression tree here.
-                        auto last_expr =
-                                _vconjunct_ctx_ptr ? 
(*_vconjunct_ctx_ptr)->root() : vexprs[0];
-                        for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < 
vexprs.size(); j++) {
-                            if (_rf_vexpr_set.find(vexprs[j]) != 
_rf_vexpr_set.end()) {
-                                continue;
-                            }
-                            TExprNode texpr_node;
-                            
texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
-                            
texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED);
-                            texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND);
-                            VExpr* new_node = _pool->add(new 
VcompoundPred(texpr_node));
-                            new_node->add_child(last_expr);
-                            new_node->add_child(vexprs[j]);
-                            last_expr = new_node;
-                            _rf_vexpr_set.insert(vexprs[j]);
-                        }
-                        auto new_vconjunct_ctx_ptr = _pool->add(new 
VExprContext(last_expr));
-                        auto expr_status = 
new_vconjunct_ctx_ptr->prepare(state, row_desc());
-                        // If error occurs in `prepare` or `open` phase, 
discard these runtime
-                        // filters directly.
-                        if (UNLIKELY(!expr_status.OK())) {
-                            LOG(WARNING) << "Something wrong for runtime 
filters: " << expr_status;
-                            vexprs.clear();
-                            break;
-                        }
-                        expr_status = new_vconjunct_ctx_ptr->open(state);
-                        if (UNLIKELY(!expr_status.OK())) {
-                            LOG(WARNING) << "Something wrong for runtime 
filters: " << expr_status;
-                            vexprs.clear();
-                            break;
-                        }
-                        if (_vconjunct_ctx_ptr) {
-                            
_stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
-                        }
-                        _vconjunct_ctx_ptr.reset(new VExprContext*);
-                        *(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr;
+                        _append_rf_into_conjuncts(state, vexprs);
                         _runtime_filter_ready_flag[i] = true;
                     }
                 }
@@ -495,14 +457,11 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) 
{
 
     if (!vexprs.empty()) {
         if (*scanner->vconjunct_ctx_ptr()) {
-            (*scanner->vconjunct_ctx_ptr())->close(state);
-            *scanner->vconjunct_ctx_ptr() = nullptr;
+            scanner->discard_conjuncts();
         }
         WARN_IF_ERROR((*_vconjunct_ctx_ptr)->clone(state, 
scanner->vconjunct_ctx_ptr()),
                       "Something wrong for runtime filters: ");
-        scanner->set_use_pushdown_conjuncts(true);
     }
-    */
 
     std::vector<Block*> blocks;
 
diff --git a/be/src/vec/exec/volap_scanner.cpp 
b/be/src/vec/exec/volap_scanner.cpp
index beb04ab0b8..9218fcac91 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -367,6 +367,9 @@ Status VOlapScanner::close(RuntimeState* state) {
     if (_is_closed) {
         return Status::OK();
     }
+    for (auto& ctx : _stale_vexpr_ctxs) {
+        ctx->close(state);
+    }
     if (_vconjunct_ctx) {
         _vconjunct_ctx->close(state);
     }
diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h
index f36fd55adf..3708b5b45e 100644
--- a/be/src/vec/exec/volap_scanner.h
+++ b/be/src/vec/exec/volap_scanner.h
@@ -61,6 +61,12 @@ public:
 
     VExprContext** vconjunct_ctx_ptr() { return &_vconjunct_ctx; }
 
+    void discard_conjuncts() {
+        _vconjunct_ctx->mark_as_stale();
+        _stale_vexpr_ctxs.push_back(_vconjunct_ctx);
+        _vconjunct_ctx = nullptr;
+    }
+
     void mark_to_need_to_close() { _need_to_close = true; }
 
     bool need_to_close() { return _need_to_close; }
@@ -143,6 +149,8 @@ private:
     bool _need_to_close = false;
 
     TabletSchemaSPtr _tablet_schema;
+
+    std::vector<VExprContext*> _stale_vexpr_ctxs;
 };
 
 } // namespace vectorized
diff --git a/be/src/vec/exprs/vbloom_predicate.cpp 
b/be/src/vec/exprs/vbloom_predicate.cpp
index c708efdda0..83e1443ec6 100644
--- a/be/src/vec/exprs/vbloom_predicate.cpp
+++ b/be/src/vec/exprs/vbloom_predicate.cpp
@@ -19,6 +19,8 @@
 
 #include <string_view>
 
+#include "vec/data_types/data_type_nullable.h"
+
 namespace doris::vectorized {
 
 VBloomPredicate::VBloomPredicate(const TExprNode& node)
@@ -73,8 +75,18 @@ Status VBloomPredicate::execute(VExprContext* context, 
Block* block, int* result
     size_t sz = argument_column->size();
     res_data_column->resize(sz);
     auto ptr = 
((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
-    for (size_t i = 0; i < sz; i++) {
-        ptr[i] = _filter->find(reinterpret_cast<const 
void*>(argument_column->get_data_at(i).data));
+    if 
(WhichDataType(remove_nullable(block->get_by_position(arguments[0]).type))
+                .is_string_or_fixed_string()) {
+        for (size_t i = 0; i < sz; i++) {
+            auto ele = argument_column->get_data_at(i);
+            const StringValue v(ele.data, ele.size);
+            ptr[i] = _filter->find(reinterpret_cast<const void*>(&v));
+        }
+    } else {
+        for (size_t i = 0; i < sz; i++) {
+            ptr[i] = _filter->find(
+                    reinterpret_cast<const 
void*>(argument_column->get_data_at(i).data));
+        }
     }
     if (_data_type->is_nullable()) {
         auto null_map = ColumnVector<UInt8>::create(block->rows(), 0);
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp 
b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
index 2bd353e934..cdc3d2d5dc 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
@@ -29,14 +29,20 @@
 namespace doris::vectorized {
 
 VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr* 
impl)
-        : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), 
_scan_rows(0) {}
+        : VExpr(node),
+          _impl(impl),
+          _always_true(false),
+          _filtered_rows(0),
+          _scan_rows(0),
+          _is_closed(false) {}
 
 VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper& 
vexpr)
         : VExpr(vexpr),
           _impl(vexpr._impl),
           _always_true(vexpr._always_true),
           _filtered_rows(vexpr._filtered_rows.load()),
-          _scan_rows(vexpr._scan_rows.load()) {}
+          _scan_rows(vexpr._scan_rows.load()),
+          _is_closed(false) {}
 
 Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const 
RowDescriptor& desc,
                                       VExprContext* context) {
@@ -52,6 +58,10 @@ Status VRuntimeFilterWrapper::open(RuntimeState* state, 
VExprContext* context,
 
 void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
                                   FunctionContext::FunctionStateScope scope) {
+    if (_is_closed) {
+        return;
+    }
+    _is_closed = true;
     _impl->close(state, context, scope);
 }
 
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h 
b/be/src/vec/exprs/vruntimefilter_wrapper.h
index 91a6bdbcac..e04faba9ef 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.h
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.h
@@ -61,5 +61,7 @@ private:
     constexpr static double EXPECTED_FILTER_RATE = 0.2;
 
     std::string _expr_name;
+
+    bool _is_closed;
 };
 } // namespace doris::vectorized
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to