This is an automated email from the ASF dual-hosted git repository.

jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d545eb3865c [fix](inverted index) fixed in_list condition not indexed 
on pipelinex (#36565)
d545eb3865c is described below

commit d545eb3865c77d16304c7a0d56003f487d33a5f3
Author: zzzxl <33418555+zzzxl1...@users.noreply.github.com>
AuthorDate: Fri Jun 21 10:14:00 2024 +0800

    [fix](inverted index) fixed in_list condition not indexed on pipelinex 
(#36565)
---
 be/src/exec/olap_utils.h                           |  4 +-
 be/src/olap/rowset/segment_v2/segment_iterator.cpp |  9 +++
 be/src/pipeline/exec/scan_operator.cpp             | 93 +++++++++++++++-------
 be/src/pipeline/exec/scan_operator.h               | 23 ++++--
 .../test_index_inlist_fault_injection.out          | 19 +++++
 .../test_index_inlist_fault_injection.groovy       | 93 ++++++++++++++++++++++
 6 files changed, 203 insertions(+), 38 deletions(-)

diff --git a/be/src/exec/olap_utils.h b/be/src/exec/olap_utils.h
index d1a1be81f5d..ddf8562fea1 100644
--- a/be/src/exec/olap_utils.h
+++ b/be/src/exec/olap_utils.h
@@ -117,9 +117,9 @@ inline SQLFilterOp to_olap_filter_type(const std::string& 
function_name, bool op
         return opposite ? FILTER_NOT_IN : FILTER_IN;
     } else if (function_name == "ne") {
         return opposite ? FILTER_IN : FILTER_NOT_IN;
-    } else if (function_name == "in_list") {
+    } else if (function_name == "in") {
         return opposite ? FILTER_NOT_IN : FILTER_IN;
-    } else if (function_name == "not_in_list") {
+    } else if (function_name == "not_in") {
         return opposite ? FILTER_IN : FILTER_NOT_IN;
     } else {
         DCHECK(false) << "Function Name: " << function_name;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 37df15d6939..f0c3f8f4920 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -2403,6 +2403,15 @@ Status 
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
         return Status::EndOfFile("no more data in segment");
     }
 
+    DBUG_EXECUTE_IF("segment_iterator._rowid_result_for_index", {
+        for (auto& iter : _rowid_result_for_index) {
+            if (iter.second.first) {
+                return Status::Error<ErrorCode::INTERNAL_ERROR>(
+                        "_rowid_result_for_index exists true");
+            }
+        }
+    })
+
     if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) {
         if (_non_predicate_columns.empty()) {
             return Status::InternalError("_non_predicate_columns is empty");
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 161a79fb7c1..21f87c68d5d 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -994,8 +994,10 @@ void 
ScanLocalState<Derived>::_normalize_compound_predicate(
         auto compound_fn_name = expr->fn().name.function_name;
         auto children_num = expr->children().size();
         for (auto i = 0; i < children_num; ++i) {
-            auto child_expr = expr->children()[i].get();
-            if (TExprNodeType::BINARY_PRED == child_expr->node_type()) {
+            auto* child_expr = expr->children()[i].get();
+            if (TExprNodeType::BINARY_PRED == child_expr->node_type() ||
+                TExprNodeType::IN_PRED == child_expr->node_type() ||
+                TExprNodeType::MATCH_PRED == child_expr->node_type()) {
                 SlotDescriptor* slot = nullptr;
                 ColumnValueRangeType* range_on_slot = nullptr;
                 if (_is_predicate_acting_on_slot(child_expr, 
in_predicate_checker, &slot,
@@ -1010,30 +1012,16 @@ void 
ScanLocalState<Derived>::_normalize_compound_predicate(
                                     value_range.mark_runtime_filter_predicate(
                                             _is_runtime_filter_predicate);
                                 }};
-                                
static_cast<void>(_normalize_binary_in_compound_predicate(
-                                        child_expr, expr_ctx, slot, 
value_range, pdt));
-                            },
-                            active_range);
-
-                    _compound_value_ranges.emplace_back(active_range);
-                }
-            } else if (TExprNodeType::MATCH_PRED == child_expr->node_type()) {
-                SlotDescriptor* slot = nullptr;
-                ColumnValueRangeType* range_on_slot = nullptr;
-                if (_is_predicate_acting_on_slot(child_expr, 
in_predicate_checker, &slot,
-                                                 &range_on_slot) ||
-                    _is_predicate_acting_on_slot(child_expr, 
eq_predicate_checker, &slot,
-                                                 &range_on_slot)) {
-                    ColumnValueRangeType active_range =
-                            *range_on_slot; // copy, in order not to affect 
the range in the _colname_to_value_range
-                    std::visit(
-                            [&](auto& value_range) {
-                                Defer mark_runtime_filter_flag {[&]() {
-                                    value_range.mark_runtime_filter_predicate(
-                                            _is_runtime_filter_predicate);
-                                }};
-                                
static_cast<void>(_normalize_match_in_compound_predicate(
-                                        child_expr, expr_ctx, slot, 
value_range, pdt));
+                                if (TExprNodeType::BINARY_PRED == 
child_expr->node_type()) {
+                                    
static_cast<void>(_normalize_binary_compound_predicate(
+                                            child_expr, expr_ctx, slot, 
value_range, pdt));
+                                } else if (TExprNodeType::IN_PRED == 
child_expr->node_type()) {
+                                    
static_cast<void>(_normalize_in_and_not_in_compound_predicate(
+                                            child_expr, expr_ctx, slot, 
value_range, pdt));
+                                } else {
+                                    
static_cast<void>(_normalize_match_compound_predicate(
+                                            child_expr, expr_ctx, slot, 
value_range, pdt));
+                                }
                             },
                             active_range);
 
@@ -1050,7 +1038,7 @@ void 
ScanLocalState<Derived>::_normalize_compound_predicate(
 
 template <typename Derived>
 template <PrimitiveType T>
-Status ScanLocalState<Derived>::_normalize_binary_in_compound_predicate(
+Status ScanLocalState<Derived>::_normalize_binary_compound_predicate(
         vectorized::VExpr* expr, vectorized::VExprContext* expr_ctx, 
SlotDescriptor* slot,
         ColumnValueRange<T>& range, PushDownType* pdt) {
     DCHECK(expr->children().size() == 2);
@@ -1107,7 +1095,56 @@ Status 
ScanLocalState<Derived>::_normalize_binary_in_compound_predicate(
 
 template <typename Derived>
 template <PrimitiveType T>
-Status ScanLocalState<Derived>::_normalize_match_in_compound_predicate(
+Status ScanLocalState<Derived>::_normalize_in_and_not_in_compound_predicate(
+        vectorized::VExpr* expr, vectorized::VExprContext* expr_ctx, 
SlotDescriptor* slot,
+        ColumnValueRange<T>& range, PushDownType* pdt) {
+    if (TExprNodeType::IN_PRED == expr->node_type()) {
+        std::string fn_name = expr->op() == TExprOpcode::type::FILTER_IN ? 
"in" : "not_in";
+
+        HybridSetBase::IteratorBase* iter = nullptr;
+        auto hybrid_set = expr->get_set_func();
+
+        if (hybrid_set != nullptr) {
+            if (hybrid_set->size() <=
+                _parent->cast<typename 
Derived::Parent>()._max_pushdown_conditions_per_column) {
+                iter = hybrid_set->begin();
+            } else {
+                _filter_predicates.in_filters.emplace_back(slot->col_name(), 
expr->get_set_func());
+                *pdt = PushDownType::ACCEPTABLE;
+                return Status::OK();
+            }
+        } else {
+            vectorized::VInPredicate* pred = 
static_cast<vectorized::VInPredicate*>(expr);
+
+            vectorized::InState* state = 
reinterpret_cast<vectorized::InState*>(
+                    expr_ctx->fn_context(pred->fn_context_index())
+                            
->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+
+            if (!state->use_set) {
+                return Status::OK();
+            }
+
+            iter = state->hybrid_set->begin();
+        }
+
+        while (iter->has_next()) {
+            if (nullptr == iter->get_value()) {
+                iter->next();
+                continue;
+            }
+            auto* value = const_cast<void*>(iter->get_value());
+            RETURN_IF_ERROR(_change_value_range<false>(
+                    range, value, 
ColumnValueRange<T>::add_compound_value_range, fn_name, 0));
+            iter->next();
+        }
+        *pdt = PushDownType::ACCEPTABLE;
+    }
+    return Status::OK();
+}
+
+template <typename Derived>
+template <PrimitiveType T>
+Status ScanLocalState<Derived>::_normalize_match_compound_predicate(
         vectorized::VExpr* expr, vectorized::VExprContext* expr_ctx, 
SlotDescriptor* slot,
         ColumnValueRange<T>& range, PushDownType* pdt) {
     DCHECK(expr->children().size() == 2);
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 84db26da051..6c2c3e80346 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -300,16 +300,23 @@ protected:
                                      vectorized::VExprSPtr&)>& 
eq_predicate_checker);
 
     template <PrimitiveType T>
-    Status _normalize_binary_in_compound_predicate(vectorized::VExpr* expr,
-                                                   vectorized::VExprContext* 
expr_ctx,
-                                                   SlotDescriptor* slot, 
ColumnValueRange<T>& range,
-                                                   PushDownType* pdt);
+    Status _normalize_binary_compound_predicate(vectorized::VExpr* expr,
+                                                vectorized::VExprContext* 
expr_ctx,
+                                                SlotDescriptor* slot, 
ColumnValueRange<T>& range,
+                                                PushDownType* pdt);
 
     template <PrimitiveType T>
-    Status _normalize_match_in_compound_predicate(vectorized::VExpr* expr,
-                                                  vectorized::VExprContext* 
expr_ctx,
-                                                  SlotDescriptor* slot, 
ColumnValueRange<T>& range,
-                                                  PushDownType* pdt);
+    Status _normalize_in_and_not_in_compound_predicate(vectorized::VExpr* expr,
+                                                       
vectorized::VExprContext* expr_ctx,
+                                                       SlotDescriptor* slot,
+                                                       ColumnValueRange<T>& 
range,
+                                                       PushDownType* pdt);
+
+    template <PrimitiveType T>
+    Status _normalize_match_compound_predicate(vectorized::VExpr* expr,
+                                               vectorized::VExprContext* 
expr_ctx,
+                                               SlotDescriptor* slot, 
ColumnValueRange<T>& range,
+                                               PushDownType* pdt);
 
     template <PrimitiveType T>
     Status _normalize_is_null_predicate(vectorized::VExpr* expr, 
vectorized::VExprContext* expr_ctx,
diff --git 
a/regression-test/data/fault_injection_p0/test_index_inlist_fault_injection.out 
b/regression-test/data/fault_injection_p0/test_index_inlist_fault_injection.out
new file mode 100644
index 00000000000..9fbd1c8e252
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/test_index_inlist_fault_injection.out
@@ -0,0 +1,19 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+8
+
+-- !sql --
+996
+
+-- !sql --
+210
+
+-- !sql --
+8
+
+-- !sql --
+998
+
+-- !sql --
+208
+
diff --git 
a/regression-test/suites/fault_injection_p0/test_index_inlist_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_index_inlist_fault_injection.groovy
new file mode 100644
index 00000000000..e0c340c0aa9
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_index_inlist_fault_injection.groovy
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_index_inlist_fault_injection", "nonConcurrent") {
+    // define a sql table
+    def indexTbName = "test_index_inlist_fault_injection"
+
+    sql "DROP TABLE IF EXISTS ${indexTbName}"
+    sql """
+      CREATE TABLE ${indexTbName} (
+        `@timestamp` int(11) NULL COMMENT "",
+        `clientip` varchar(20) NULL COMMENT "",
+        `request` text NULL COMMENT "",
+        `status` int(11) NULL COMMENT "",
+        `size` int(11) NULL COMMENT "",
+        INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '',
+        INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = 
"english", "support_phrase" = "true") COMMENT '',
+        INDEX status_idx (`status`) USING INVERTED COMMENT ''
+      ) ENGINE=OLAP
+      DUPLICATE KEY(`@timestamp`)
+      COMMENT "OLAP"
+      DISTRIBUTED BY RANDOM BUCKETS 1
+      PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "disable_auto_compaction" = "true"
+      );
+    """
+
+    def load_httplogs_data = {table_name, label, read_flag, format_flag, 
file_name, ignore_failure=false,
+                        expected_succ_rows = -1, load_to_single_tablet = 
'true' ->
+
+        // load the json data
+        streamLoad {
+            table "${table_name}"
+
+            // set http request header params
+            set 'label', label + "_" + UUID.randomUUID().toString()
+            set 'read_json_by_line', read_flag
+            set 'format', format_flag
+            file file_name // import json file
+            time 10000 // limit inflight 10s
+            if (expected_succ_rows >= 0) {
+                set 'max_filter_ratio', '1'
+            }
+
+            // if declared a check callback, the default check condition will 
ignore.
+            // So you must check all condition
+            check { result, exception, startTime, endTime ->
+                       if (ignore_failure && expected_succ_rows < 0) { return }
+                    if (exception != null) {
+                        throw exception
+                    }
+                    log.info("Stream load result: ${result}".toString())
+                    def json = parseJson(result)
+            }
+        }
+    }
+
+    try {
+      load_httplogs_data.call(indexTbName, 
'test_index_inlist_fault_injection', 'true', 'json', 'documents-1000.json')
+
+      sql "sync"
+
+      try {
+        
GetDebugPoint().enableDebugPointForAllBEs("segment_iterator._rowid_result_for_index")
+
+        qt_sql """ select /*+ SET_VAR(inverted_index_skip_threshold = 0) */ 
count() from ${indexTbName} where clientip in ('40.135.0.0', '232.0.0.0', 
'26.1.0.0'); """
+        qt_sql """ select /*+ SET_VAR(inverted_index_skip_threshold = 0) */ 
count() from ${indexTbName} where status in (1, 304, 200); """
+        qt_sql """ select /*+ SET_VAR(inverted_index_skip_threshold = 0) */ 
count() from ${indexTbName} where (request match 'hm' or clientip in 
('40.135.0.0', '232.0.0.0', '26.1.0.0')); """
+        qt_sql """ select /*+ SET_VAR(inverted_index_skip_threshold = 0) */ 
count() from ${indexTbName} where (request match 'hm' and clientip in 
('40.135.0.0', '232.0.0.0', '26.1.0.0')); """
+        qt_sql """ select /*+ SET_VAR(inverted_index_skip_threshold = 0) */ 
count() from ${indexTbName} where (request match 'hm' or status in (1, 304, 
200)); """
+        qt_sql """ select /*+ SET_VAR(inverted_index_skip_threshold = 0) */ 
count() from ${indexTbName} where (request match 'hm' and status in (1, 304, 
200)); """
+
+      } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("segment_iterator._rowid_result_for_index")
+      }
+    } finally {
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to