This is an automated email from the ASF dual-hosted git repository. jianliangqi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d545eb3865c [fix](inverted index) fixed in_list condition not indexed on pipelinex (#36565) d545eb3865c is described below commit d545eb3865c77d16304c7a0d56003f487d33a5f3 Author: zzzxl <33418555+zzzxl1...@users.noreply.github.com> AuthorDate: Fri Jun 21 10:14:00 2024 +0800 [fix](inverted index) fixed in_list condition not indexed on pipelinex (#36565) --- be/src/exec/olap_utils.h | 4 +- be/src/olap/rowset/segment_v2/segment_iterator.cpp | 9 +++ be/src/pipeline/exec/scan_operator.cpp | 93 +++++++++++++++------- be/src/pipeline/exec/scan_operator.h | 23 ++++-- .../test_index_inlist_fault_injection.out | 19 +++++ .../test_index_inlist_fault_injection.groovy | 93 ++++++++++++++++++++++ 6 files changed, 203 insertions(+), 38 deletions(-) diff --git a/be/src/exec/olap_utils.h b/be/src/exec/olap_utils.h index d1a1be81f5d..ddf8562fea1 100644 --- a/be/src/exec/olap_utils.h +++ b/be/src/exec/olap_utils.h @@ -117,9 +117,9 @@ inline SQLFilterOp to_olap_filter_type(const std::string& function_name, bool op return opposite ? FILTER_NOT_IN : FILTER_IN; } else if (function_name == "ne") { return opposite ? FILTER_IN : FILTER_NOT_IN; - } else if (function_name == "in_list") { + } else if (function_name == "in") { return opposite ? FILTER_NOT_IN : FILTER_IN; - } else if (function_name == "not_in_list") { + } else if (function_name == "not_in") { return opposite ? FILTER_IN : FILTER_NOT_IN; } else { DCHECK(false) << "Function Name: " << function_name; diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 37df15d6939..f0c3f8f4920 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -2403,6 +2403,15 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) { return Status::EndOfFile("no more data in segment"); } + DBUG_EXECUTE_IF("segment_iterator._rowid_result_for_index", { + for (auto& iter : _rowid_result_for_index) { + if (iter.second.first) { + return Status::Error<ErrorCode::INTERNAL_ERROR>( + "_rowid_result_for_index exists true"); + } + } + }) + if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) { if (_non_predicate_columns.empty()) { return Status::InternalError("_non_predicate_columns is empty"); diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 161a79fb7c1..21f87c68d5d 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -994,8 +994,10 @@ void ScanLocalState<Derived>::_normalize_compound_predicate( auto compound_fn_name = expr->fn().name.function_name; auto children_num = expr->children().size(); for (auto i = 0; i < children_num; ++i) { - auto child_expr = expr->children()[i].get(); - if (TExprNodeType::BINARY_PRED == child_expr->node_type()) { + auto* child_expr = expr->children()[i].get(); + if (TExprNodeType::BINARY_PRED == child_expr->node_type() || + TExprNodeType::IN_PRED == child_expr->node_type() || + TExprNodeType::MATCH_PRED == child_expr->node_type()) { SlotDescriptor* slot = nullptr; ColumnValueRangeType* range_on_slot = nullptr; if (_is_predicate_acting_on_slot(child_expr, in_predicate_checker, &slot, @@ -1010,30 +1012,16 @@ void ScanLocalState<Derived>::_normalize_compound_predicate( value_range.mark_runtime_filter_predicate( _is_runtime_filter_predicate); }}; - static_cast<void>(_normalize_binary_in_compound_predicate( - child_expr, expr_ctx, slot, value_range, pdt)); - }, - active_range); - - _compound_value_ranges.emplace_back(active_range); - } - } else if (TExprNodeType::MATCH_PRED == child_expr->node_type()) { - SlotDescriptor* slot = nullptr; - ColumnValueRangeType* range_on_slot = nullptr; - if (_is_predicate_acting_on_slot(child_expr, in_predicate_checker, &slot, - &range_on_slot) || - _is_predicate_acting_on_slot(child_expr, eq_predicate_checker, &slot, - &range_on_slot)) { - ColumnValueRangeType active_range = - *range_on_slot; // copy, in order not to affect the range in the _colname_to_value_range - std::visit( - [&](auto& value_range) { - Defer mark_runtime_filter_flag {[&]() { - value_range.mark_runtime_filter_predicate( - _is_runtime_filter_predicate); - }}; - static_cast<void>(_normalize_match_in_compound_predicate( - child_expr, expr_ctx, slot, value_range, pdt)); + if (TExprNodeType::BINARY_PRED == child_expr->node_type()) { + static_cast<void>(_normalize_binary_compound_predicate( + child_expr, expr_ctx, slot, value_range, pdt)); + } else if (TExprNodeType::IN_PRED == child_expr->node_type()) { + static_cast<void>(_normalize_in_and_not_in_compound_predicate( + child_expr, expr_ctx, slot, value_range, pdt)); + } else { + static_cast<void>(_normalize_match_compound_predicate( + child_expr, expr_ctx, slot, value_range, pdt)); + } }, active_range); @@ -1050,7 +1038,7 @@ void ScanLocalState<Derived>::_normalize_compound_predicate( template <typename Derived> template <PrimitiveType T> -Status ScanLocalState<Derived>::_normalize_binary_in_compound_predicate( +Status ScanLocalState<Derived>::_normalize_binary_compound_predicate( vectorized::VExpr* expr, vectorized::VExprContext* expr_ctx, SlotDescriptor* slot, ColumnValueRange<T>& range, PushDownType* pdt) { DCHECK(expr->children().size() == 2); @@ -1107,7 +1095,56 @@ Status ScanLocalState<Derived>::_normalize_binary_in_compound_predicate( template <typename Derived> template <PrimitiveType T> -Status ScanLocalState<Derived>::_normalize_match_in_compound_predicate( +Status ScanLocalState<Derived>::_normalize_in_and_not_in_compound_predicate( + vectorized::VExpr* expr, vectorized::VExprContext* expr_ctx, SlotDescriptor* slot, + ColumnValueRange<T>& range, PushDownType* pdt) { + if (TExprNodeType::IN_PRED == expr->node_type()) { + std::string fn_name = expr->op() == TExprOpcode::type::FILTER_IN ? "in" : "not_in"; + + HybridSetBase::IteratorBase* iter = nullptr; + auto hybrid_set = expr->get_set_func(); + + if (hybrid_set != nullptr) { + if (hybrid_set->size() <= + _parent->cast<typename Derived::Parent>()._max_pushdown_conditions_per_column) { + iter = hybrid_set->begin(); + } else { + _filter_predicates.in_filters.emplace_back(slot->col_name(), expr->get_set_func()); + *pdt = PushDownType::ACCEPTABLE; + return Status::OK(); + } + } else { + vectorized::VInPredicate* pred = static_cast<vectorized::VInPredicate*>(expr); + + vectorized::InState* state = reinterpret_cast<vectorized::InState*>( + expr_ctx->fn_context(pred->fn_context_index()) + ->get_function_state(FunctionContext::FRAGMENT_LOCAL)); + + if (!state->use_set) { + return Status::OK(); + } + + iter = state->hybrid_set->begin(); + } + + while (iter->has_next()) { + if (nullptr == iter->get_value()) { + iter->next(); + continue; + } + auto* value = const_cast<void*>(iter->get_value()); + RETURN_IF_ERROR(_change_value_range<false>( + range, value, ColumnValueRange<T>::add_compound_value_range, fn_name, 0)); + iter->next(); + } + *pdt = PushDownType::ACCEPTABLE; + } + return Status::OK(); +} + +template <typename Derived> +template <PrimitiveType T> +Status ScanLocalState<Derived>::_normalize_match_compound_predicate( vectorized::VExpr* expr, vectorized::VExprContext* expr_ctx, SlotDescriptor* slot, ColumnValueRange<T>& range, PushDownType* pdt) { DCHECK(expr->children().size() == 2); diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 84db26da051..6c2c3e80346 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -300,16 +300,23 @@ protected: vectorized::VExprSPtr&)>& eq_predicate_checker); template <PrimitiveType T> - Status _normalize_binary_in_compound_predicate(vectorized::VExpr* expr, - vectorized::VExprContext* expr_ctx, - SlotDescriptor* slot, ColumnValueRange<T>& range, - PushDownType* pdt); + Status _normalize_binary_compound_predicate(vectorized::VExpr* expr, + vectorized::VExprContext* expr_ctx, + SlotDescriptor* slot, ColumnValueRange<T>& range, + PushDownType* pdt); template <PrimitiveType T> - Status _normalize_match_in_compound_predicate(vectorized::VExpr* expr, - vectorized::VExprContext* expr_ctx, - SlotDescriptor* slot, ColumnValueRange<T>& range, - PushDownType* pdt); + Status _normalize_in_and_not_in_compound_predicate(vectorized::VExpr* expr, + vectorized::VExprContext* expr_ctx, + SlotDescriptor* slot, + ColumnValueRange<T>& range, + PushDownType* pdt); + + template <PrimitiveType T> + Status _normalize_match_compound_predicate(vectorized::VExpr* expr, + vectorized::VExprContext* expr_ctx, + SlotDescriptor* slot, ColumnValueRange<T>& range, + PushDownType* pdt); template <PrimitiveType T> Status _normalize_is_null_predicate(vectorized::VExpr* expr, vectorized::VExprContext* expr_ctx, diff --git a/regression-test/data/fault_injection_p0/test_index_inlist_fault_injection.out b/regression-test/data/fault_injection_p0/test_index_inlist_fault_injection.out new file mode 100644 index 00000000000..9fbd1c8e252 --- /dev/null +++ b/regression-test/data/fault_injection_p0/test_index_inlist_fault_injection.out @@ -0,0 +1,19 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +8 + +-- !sql -- +996 + +-- !sql -- +210 + +-- !sql -- +8 + +-- !sql -- +998 + +-- !sql -- +208 + diff --git a/regression-test/suites/fault_injection_p0/test_index_inlist_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_index_inlist_fault_injection.groovy new file mode 100644 index 00000000000..e0c340c0aa9 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_index_inlist_fault_injection.groovy @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_index_inlist_fault_injection", "nonConcurrent") { + // define a sql table + def indexTbName = "test_index_inlist_fault_injection" + + sql "DROP TABLE IF EXISTS ${indexTbName}" + sql """ + CREATE TABLE ${indexTbName} ( + `@timestamp` int(11) NULL COMMENT "", + `clientip` varchar(20) NULL COMMENT "", + `request` text NULL COMMENT "", + `status` int(11) NULL COMMENT "", + `size` int(11) NULL COMMENT "", + INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '', + INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '', + INDEX status_idx (`status`) USING INVERTED COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`@timestamp`) + COMMENT "OLAP" + DISTRIBUTED BY RANDOM BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + + def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false, + expected_succ_rows = -1, load_to_single_tablet = 'true' -> + + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'label', label + "_" + UUID.randomUUID().toString() + set 'read_json_by_line', read_flag + set 'format', format_flag + file file_name // import json file + time 10000 // limit inflight 10s + if (expected_succ_rows >= 0) { + set 'max_filter_ratio', '1' + } + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (ignore_failure && expected_succ_rows < 0) { return } + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + } + } + } + + try { + load_httplogs_data.call(indexTbName, 'test_index_inlist_fault_injection', 'true', 'json', 'documents-1000.json') + + sql "sync" + + try { + GetDebugPoint().enableDebugPointForAllBEs("segment_iterator._rowid_result_for_index") + + qt_sql """ select /*+ SET_VAR(inverted_index_skip_threshold = 0) */ count() from ${indexTbName} where clientip in ('40.135.0.0', '232.0.0.0', '26.1.0.0'); """ + qt_sql """ select /*+ SET_VAR(inverted_index_skip_threshold = 0) */ count() from ${indexTbName} where status in (1, 304, 200); """ + qt_sql """ select /*+ SET_VAR(inverted_index_skip_threshold = 0) */ count() from ${indexTbName} where (request match 'hm' or clientip in ('40.135.0.0', '232.0.0.0', '26.1.0.0')); """ + qt_sql """ select /*+ SET_VAR(inverted_index_skip_threshold = 0) */ count() from ${indexTbName} where (request match 'hm' and clientip in ('40.135.0.0', '232.0.0.0', '26.1.0.0')); """ + qt_sql """ select /*+ SET_VAR(inverted_index_skip_threshold = 0) */ count() from ${indexTbName} where (request match 'hm' or status in (1, 304, 200)); """ + qt_sql """ select /*+ SET_VAR(inverted_index_skip_threshold = 0) */ count() from ${indexTbName} where (request match 'hm' and status in (1, 304, 200)); """ + + } finally { + GetDebugPoint().disableDebugPointForAllBEs("segment_iterator._rowid_result_for_index") + } + } finally { + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org